Skip to content

[GLUTEN-9456][VL] Add custom direct buffered input#11452

Merged
rui-mo merged 2 commits intoapache:mainfrom
rui-mo:wip_dbi
Jan 30, 2026
Merged

[GLUTEN-9456][VL] Add custom direct buffered input#11452
rui-mo merged 2 commits intoapache:mainfrom
rui-mo:wip_dbi

Conversation

@rui-mo
Copy link
Contributor

@rui-mo rui-mo commented Jan 20, 2026

What changes are proposed in this pull request?

Add custom direct buffered input and input stream to allow additional control over how input buffers are created and managed.
Follow-up for: #11249.

How was this patch tested?

under test

Related issue: #9456

@github-actions github-actions bot added the VELOX label Jan 20, 2026
@rui-mo rui-mo changed the title [VL]: Add custom direct buffered input [VL] Add custom direct buffered input Jan 20, 2026
@rui-mo rui-mo changed the title [VL] Add custom direct buffered input [GLUTEN-9456][VL] Add custom direct buffered input Jan 20, 2026

namespace gluten {

class GlutenDirectBufferedInput : public facebook::velox::dwio::common::BufferedInput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend from DirectBufferedInput and only overwrite the function we need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do that if we want to clear the requests_, because it is private in the DirectBufferedInput.

  ~GlutenDirectBufferedInput() override {
    requests_.clear();
    for (auto& load : coalescedLoads_) {
      if (load->state() == facebook::velox::cache::CoalescedLoad::State::kLoading) {
        folly::SemiFuture<bool> waitFuture(false);
        if (!load->loadOrFuture(&waitFuture)) {
          auto& exec = folly::QueuedImmediateExecutor::instance();
          std::move(waitFuture).via(&exec).wait();
        }
      }
      load->cancel();
    }
    coalescedLoads_.clear();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge this PR firstly, but submit a PR to Velox to change them as protect and other related changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The required change has been merged into Velox, so we can make a fairly clean update now.

@FelixYBW
Copy link
Contributor

FelixYBW commented Jan 30, 2026

Thank you, Rui. Let's remove IBM/velox@2c1c467 once the PR is merged.

@j7nhai You may close PR14722

@rui-mo rui-mo merged commit 306de37 into apache:main Jan 30, 2026
58 checks passed
@ayushi-agarwal
Copy link
Contributor

@rui-mo Can there still be race condition scenarios after this change where we can hit this issue?

@rui-mo
Copy link
Contributor Author

rui-mo commented Feb 20, 2026

The Velox community has pointed out the possibility of a deadlock in scenarios like facebookincubator/velox#15625, while this approach of waiting for asynchronous I/O in the table scan destructor will not cause a deadlock in Gluten, because Gluten does not use Velox’s arbitration lock.

Hi @ayushi-agarwal, are you referring to scenarios like the one above, or is there another case you encountered?

@ayushi-agarwal
Copy link
Contributor

@rui-mo I am referring to original issue of memory leak in table scan where application was failing as async threads were still holding the memory pool reference. #9456. After this fix, is their still a possibility when we can hit this issue?

@FelixYBW
Copy link
Contributor

@rui-mo I am referring to original issue of memory leak in table scan where application was failing as async threads were still holding the memory pool reference. #9456. After this fix, is their still a possibility when we can hit this issue?

No. feel free to open issue once you hit again.

@ayushi-agarwal
Copy link
Contributor

@rui-mo @FelixYBW We see this issue again: there were few log lines added so that we know it goes in the new destructor added.
The code went there and came out of it, reserved bytes are 0 but numPools are still 4 because of which it fails.
I20260224 14:23:26.637977 108415 VeloxMemoryManager.cc:402] There are still outstanding Velox memory allocations. Waiting for 14550 ms to let possible async tasks done...
E20260224 14:23:26.645238 108415 VeloxMemoryManager.cc:408] Failed to release Velox memory manager after 43350ms as there are still outstanding memory resources.
E20260224 14:23:26.645354 108415 Exceptions.h:66] Line: /root/incubator-gluten/ep/build-velox/build/velox_ep/velox/common/memory/Memory.cpp:153, Function:~MemoryManager, Expression: pools_.size() != 0 (1 vs 0). There are unexpected alive memory pools allocated by user on memory manager destruction:
Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4
List of root pools:
sys_root usage 0B reserved 0B peak 0B
_sys_shared_leaf__0 usage 0B reserved 0B peak 0B
sys_tracing usage 0B reserved 0B peak 0B
sys_caching usage 0B reserved 0B peak 0B
sys_spilling usage 0B reserved 0B peak 0B
root usage 0B reserved 0B peak 44.00MB
task.Gluten_Stage_7_TID_1204_VTID_27 usage 0B reserved 0B peak 43.00MB
node.0 usage 0B reserved 0B peak 10.00MB
op.0.0.0.TableScan usage 0B reserved 0B peak 9.99MB
refcount 3
Memory Allocator[MALLOC capacity UNLIMITED allocated bytes 1048576 allocated pages 0 mapped pages 0]
ARBITRATOR[GLUTEN] CAPACITY 8388608.00TB numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 0B freeCapacity 0B freeReservedCapacity 0B], Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
what(): Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: pools
.size() != 0 (1 vs 0). There are unexpected alive memory pools allocated by user on memory manager destruction:
Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4
List of root pools:
sys_root usage 0B reserved 0B peak 0B
__sys_shared_leaf__0 usage 0B reserved 0B peak 0B
sys_tracing usage 0B reserved 0B peak 0B
sys_caching usage 0B reserved 0B peak 0B
sys_spilling usage 0B reserved 0B peak 0B
root usage 0B reserved 0B peak 44.00MB
task.Gluten_Stage_7_TID_1204_VTID_27 usage 0B reserved 0B peak 43.00MB
node.0 usage 0B reserved 0B peak 10.00MB
op.0.0.0.TableScan usage 0B reserved 0B peak 9.99MB
refcount 3
Memory Allocator[MALLOC capacity UNLIMITED allocated bytes 1048576 allocated pages 0 mapped pages 0]
ARBITRATOR[GLUTEN] CAPACITY 8388608.00TB numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 0B freeCapacity 0B freeReservedCapacity 0B]
Retriable: False
Function: ~MemoryManager
File: /root/incubator-gluten/ep/build-velox/build/velox_ep/velox/common/memory/Memory.cpp
Line: 153
Stack trace:

# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  0x0000000001bf53d9
# 4  _ZN6gluten18VeloxMemoryManagerD1Ev
# 5  _ZN6gluten18VeloxMemoryManagerD0Ev
# 6  _ZN6gluten13MemoryManager7releaseEPS0_
# 7  Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_release
# 8  0x00007fd1a7a0f8f0

@rui-mo
Copy link
Contributor Author

rui-mo commented Feb 24, 2026

Hi @ayushi-agarwal, I’ve learned that one of our customers is still encountering this issue after the fix was applied. It turns out their environment has I/O throttling and the bandwidth is saturated, which causes the following code to never exit.

       if (!load->loadOrFuture(&waitFuture)) {
          auto& exec = folly::QueuedImmediateExecutor::instance();
          std::move(waitFuture).via(&exec).wait();
        }

Do you have any similar throttling or bandwidth limits in your environment? Can you please also confirm if the program never exits from the above code?

@ayushi-agarwal
Copy link
Contributor

Thanks @rui-mo for quick reply, I had added a log line here - and this never got printed so I think program would have exited from the destructor.
if (load->state() == facebook::velox::cache::CoalescedLoad::State::kLoading) {
LOG(WARNING) << "Waiting for coalesced load to finish";

@rui-mo
Copy link
Contributor Author

rui-mo commented Feb 24, 2026

@ayushi-agarwal Gluten has a configuration called spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping that controls the waiting time of destructor.
Could you try increasing this value to see if it helps avoid the issue?

@ayushi-agarwal
Copy link
Contributor

@rui-mo I had tried increasing that config to 75s and for some cases I had to increase it to 120s and it helped but we didn't want to increase it too much so that it starts affecting query latency.

@rui-mo
Copy link
Contributor Author

rui-mo commented Feb 24, 2026

@ayushi-agarwal If increasing the wait time could take effect, the problem is still likely related to the std::move(waitFuture).via(&exec).wait();. Could you check the log level and make sure logs can be correctly printed out?

@ayushi-agarwal
Copy link
Contributor

@rui-mo I added a log line at the end of destructor also and it also got printed.

I see the ref count of pool also increases with time for the pool: I have added these log lines in velox memory manage tryDestructSafeLoop and here in the end you can see the refcount

Memory Allocator[MALLOC capacity UNLIMITED allocated bytes 1048576 allocated pages 0 mapped pages 0]
ARBITRATOR[GLUTEN] CAPACITY 8388608.00TB numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 0B freeCapacity 0B freeReservedCapacity 0B]
E20260224 18:18:46.178891 138102 VeloxMemoryManager.cc:345] Attempt to destruct VeloxMemoryManager failed because there are 4 outstanding memory pools.
I20260224 18:18:46.178897 138102 VeloxMemoryManager.cc:402] There are still outstanding Velox memory allocations. Waiting for 9700 ms to let possible async tasks done...
E20260224 18:18:46.183820 138102 VeloxMemoryManager.cc:344] Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4
List of root pools:
sys_root usage 0B reserved 0B peak 0B
__sys_shared_leaf__0 usage 0B reserved 0B peak 0B
sys_tracing usage 0B reserved 0B peak 0B
sys_caching usage 0B reserved 0B peak 0B
sys_spilling usage 0B reserved 0B peak 0B
root usage 0B reserved 0B peak 74.00MB
task.Gluten_Stage_112_TID_24630_VTID_804 usage 0B reserved 0B peak 73.00MB
node.0 usage 0B reserved 0B peak 72.00MB
op.0.0.0.TableScan usage 0B reserved 0B peak 70.72MB
refcount 2

Memory Allocator[MALLOC capacity UNLIMITED allocated bytes 1048576 allocated pages 0 mapped pages 0]
ARBITRATOR[GLUTEN] CAPACITY 8388608.00TB numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 0B freeCapacity 0B freeReservedCapacity 0B]
E20260224 18:18:46.183887 138102 VeloxMemoryManager.cc:345] Attempt to destruct VeloxMemoryManager failed because there are 4 outstanding memory pools.
I20260224 18:18:46.183890 138102 VeloxMemoryManager.cc:402] There are still outstanding Velox memory allocations. Waiting for 14550 ms to let possible async tasks done...
E20260224 18:18:46.188819 138102 VeloxMemoryManager.cc:408] Failed to release Velox memory manager after 43350ms as there are still outstanding memory resources.
E20260224 18:18:46.188876 138102 Exceptions.h:66] Line: /root/incubator-gluten/ep/build-velox/build/velox_ep/velox/common/memory/Memory.cpp:153, Function:~MemoryManager, Expression: pools_.size() != 0 (1 vs 0). There are unexpected alive memory pools allocated by user on memory manager destruction:
Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4
List of root pools:
sys_root usage 0B reserved 0B peak 0B
__sys_shared_leaf__0 usage 0B reserved 0B peak 0B
sys_tracing usage 0B reserved 0B peak 0B
sys_caching usage 0B reserved 0B peak 0B
sys_spilling usage 0B reserved 0B peak 0B
root usage 0B reserved 0B peak 74.00MB
task.Gluten_Stage_112_TID_24630_VTID_804 usage 0B reserved 0B peak 73.00MB
node.0 usage 0B reserved 0B peak 72.00MB
op.0.0.0.TableScan usage 0B reserved 0B peak 70.72MB
refcount 3

@ayushi-agarwal
Copy link
Contributor

@rui-mo @FelixYBW The issue is in the sleep function being used here - usleep is not working as expected and calls comes out as soon as it is called so no sleep at all. Changing it to std::this_thread::sleep_for(std::chrono::milliseconds(waitMs)); helped and now timeout of 2-3 s by default is also enough.
VeloxMemoryManager::~VeloxMemoryManager() {
static const uint32_t kWaitTimeoutMs = FLAGS_gluten_velox_async_timeout_on_task_stopping; // 30s by default
uint32_t accumulatedWaitMs = 0UL;
bool destructed = false;
for (int32_t tryCount = 0; accumulatedWaitMs < kWaitTimeoutMs; tryCount++) {
destructed = tryDestructSafe();
if (destructed) {
if (tryCount > 0) {
LOG(INFO) << "All the outstanding memory resources successfully released. ";
}
break;
}
uint32_t waitMs = 50 * static_cast<uint32_t>(pow(1.5, tryCount)); // 50ms, 75ms, 112.5ms ...
LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs
<< " ms to let possible async tasks done... ";
usleep(waitMs * 1000);
accumulatedWaitMs += waitMs;
}
if (!destructed) {
LOG(ERROR) << "Failed to release Velox memory manager after " << accumulatedWaitMs
<< "ms as there are still outstanding memory resources. ";
}
#ifdef ENABLE_JEMALLOC_STATS
malloc_stats_print(nullptr, nullptr, nullptr);
#endif
}

@rui-mo
Copy link
Contributor Author

rui-mo commented Feb 26, 2026

Changing it to std::this_thread::sleep_for(std::chrono::milliseconds(waitMs)); helped and now timeout of 2-3 s by default is also enough.

@ayushi-agarwal Thanks for keeping us updated. May I confirm whether the actual wait time has changed, or is this only a config change because of using a larger wait unit?

@FelixYBW
Copy link
Contributor

Thank you, @ayushi-agarwal, for your confirmation. In theory, with the via(exec).wait(), we needn't the usleep() here. If it still report error, which either means our logic have issue here or the I/O thread isn't the one for DBI. @rui-mo will do more investigation.

@ayushi-agarwal
Copy link
Contributor

@rui-mo In our case executor thread was getting interrupted by another monitoring tool that's why with usleep thread was not sleeping for specified interval. We didn't change the wait time, we just changed the method being used from usleep to std::this_thread::sleep_for.

@FelixYBW From our investigation, we also think issue is still with I/O thread of DBI, here in the loop we are checking for loading state but there can be a case where a task was already submitted to executor pool which was in planned state at the time we checked and which got executed later.

@rui-mo
Copy link
Contributor Author

rui-mo commented Mar 3, 2026

here in the loop we are checking for loading state but there can be a case where a task was already submitted to executor pool which was in planned state at the time we checked and which got executed later.

@ayushi-agarwal If that’s the case, could you please try #11697 to see whether it resolves the issue? Thanks!

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 5, 2026

Record here, thanks @rui-mo 's confirmation

tryDestrucSafe stack:

 0# gluten::VeloxMemoryManager::tryDestructSafe() in 
 1# gluten::VeloxMemoryManager::~VeloxMemoryManager() in 
 2# gluten::VeloxMemoryManager::~VeloxMemoryManager() in 
 3# gluten::MemoryManager::release(gluten::MemoryManager*) 
 4# Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_release 

DBI destruct stack:

 0# gluten::GlutenDirectBufferedInput::~GlutenDirectBufferedInput() in 
 1# gluten::GlutenDirectBufferedInput::~GlutenDirectBufferedInput() in 
 2# facebook::velox::parquet::ReaderBase::~ReaderBase() in 
 3# facebook::velox::parquet::ParquetReader::~ParquetReader() in 
 4# facebook::velox::connector::hive::SplitReader::~SplitReader() in 
 5# facebook::velox::connector::hive::SplitReader::~SplitReader() in 
 6# facebook::velox::connector::hive::HiveDataSource::~HiveDataSource() in 
 7# facebook::velox::connector::hive::HiveDataSource::~HiveDataSource() in 
 8# facebook::velox::exec::TableScan::~TableScan() in 
 9# facebook::velox::exec::TableScan::~TableScan() in 
10# facebook::velox::exec::Driver::~Driver() in 
11# std::__1::__shared_ptr_pointer<facebook::velox::exec::Driver*, std::__1::shared_ptr<facebook::velox::exec::Driver>::__shared_ptr_default_delete<facebook::velox::exec::Driver, facebook::velox::exec::Driver>, std::__1::allocator<facebook::velox::exec::Driver>>::__on_zero_shared() in 
12# facebook::velox::exec::Task::next(folly::SemiFuture<folly::Unit>*) in 
13# gluten::WholeStageResultIterator::next() in 
14# gluten::ResultIterator::hasNext() 
15# Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext 

thread:

dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
dbi thread id: 0x17670f000
velox memory manager thread id: 0x17670f000
velox memory manager thread id: 0x17670f000
velox memory manager thread id: 0x17670f000
velox memory manager thread id: 0x17670f000

They are confirmed in the same thread, and DBI is released before tryDestrucSafe.

If everything work correctly, we needn't wait in tryDestrucSafe for DBI release.

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 5, 2026

@ayushi-agarwal can you print the call stack and the thread id when you hit the issue? Which can help us to identify where the issue is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants