Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cpp/core/memory/ArrowMemoryPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@

namespace gluten {

ArrowMemoryPool::~ArrowMemoryPool() {
if (releaser_ != nullptr) {
releaser_(this);
}
}

arrow::Status ArrowMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) {
if (!allocator_->allocateAligned(alignment, size, reinterpret_cast<void**>(out))) {
return arrow::Status::Invalid("WrappedMemoryPool: Error allocating " + std::to_string(size) + " bytes");
Expand Down
9 changes: 3 additions & 6 deletions cpp/core/memory/ArrowMemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ using ArrowMemoryPoolReleaser = std::function<void(arrow::MemoryPool*)>;
class ArrowMemoryPool final : public arrow::MemoryPool {
public:
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init, hicpp-member-init)
explicit ArrowMemoryPool(AllocationListener* listener, ArrowMemoryPoolReleaser releaser = nullptr)
: allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), listener)),
releaser_(std::move(releaser)) {}
explicit ArrowMemoryPool(AllocationListener* listener)
: allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), listener)) {}

~ArrowMemoryPool() override;
~ArrowMemoryPool() override = default;

ArrowMemoryPool(const ArrowMemoryPool&) = delete;

Expand Down Expand Up @@ -64,8 +63,6 @@ class ArrowMemoryPool final : public arrow::MemoryPool {

private:
std::unique_ptr<MemoryAllocator> allocator_ = nullptr;

ArrowMemoryPoolReleaser releaser_;
};

} // namespace gluten
28 changes: 13 additions & 15 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,13 @@ MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
return stats;
}

void logMemoryUsageStats(MemoryUsageStats stats, const std::string& name, const std::string& logPrefix, std::stringstream& ss) {
ss << logPrefix << "+- " << name
<< " (used: " << velox::succinctBytes(stats.current())
<< ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
void logMemoryUsageStats(
MemoryUsageStats stats,
const std::string& name,
const std::string& logPrefix,
std::stringstream& ss) {
ss << logPrefix << "+- " << name << " (used: " << velox::succinctBytes(stats.current())
<< ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
if (stats.children_size() > 0) {
for (auto it = stats.children().begin(); it != stats.children().end(); ++it) {
logMemoryUsageStats(it->second, it->first, logPrefix + " ", ss);
Expand Down Expand Up @@ -327,22 +330,17 @@ int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
std::shared_ptr<arrow::MemoryPool> VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
auto pool = it->second.lock();
VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed", name);
return pool;
if (auto pool = it->second.lock()) {
return pool;
}
arrowPools_.erase(name);
}
auto pool = std::make_shared<ArrowMemoryPool>(
blockListener_.get(), [this, name](arrow::MemoryPool* pool) { this->dropMemoryPool(name); });

auto pool = std::make_shared<ArrowMemoryPool>(blockListener_.get());
Comment on lines +333 to +339
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we no longer remove the entries, will there be any memory leak risk caused by the expanding entry list in arrowPools_?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like not an issue as long as the number of keys is not too large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baibaichen mentioned in the issue that we can use a background thread to clean up, but I wonder if that might be too heavy for the current use of the arrow pool, since only shuffle and parquet write create named arrow memory pool.

If necessary, perhaps looping over the weak_ptr each time getOrCreateArrowMemoryPool is called and remove the expired ones. @zhztheplayer What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like not an issue as long as the number of keys is not too large.

I thought it might be up to the usage. Hi @marin-ma, do you think we can skip cleaning up the Arrow pools? In that case, we can simply store shared pointer as the class member.

If necessary, perhaps looping over the weak_ptr each time getOrCreateArrowMemoryPool is called and remove the expired ones.

Otherwise, this might be helpful, but we should also figure out whether it will cause any instability regarding performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there aren’t many entries in arrowPools_, I think it’s fine to keep them via shared_ptr. A MemoryPool object itself shouldn’t take much memory anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using weak_ptr can help to clarify the ownership of the arrow pool. For example if the arrow pool is used by shuffle A, it should be destroyed together with the shuffle writer A itself. And then the next shuffle B will create a new pool. In this way we can also track the memory allocation status for each shuffle separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marin-ma I meant, is it feasible to use std::unordered_map<std::string, std::shared_ptr<ArrowMemoryPool>> arrowPools_? Only by changing the map value type from std::weak_ptr to std::shared_ptr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in the latest PR's code, the expired map entries won't get removed from map until being replaced by new value of the same key. Is this an intentional change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in the latest PR's code, the expired map entries won't get removed from map until being replaced by new value of the same key. Is this an intentional change?

Yes. In this way it can create new arrow pools for each shuffle writer instance. If changing to std::unordered_map<std::string, std::shared_ptr<ArrowMemoryPool>> arrowPools_, the new shuffle writers will reuse the ones created by the previous shuffle.

arrowPools_.emplace(name, pool);
return pool;
}

void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
const auto ret = arrowPools_.erase(name);
VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
}

const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
MemoryUsageStats stats;
stats.set_current(listener_->currentBytes());
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/memory/VeloxMemoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ class VeloxMemoryManager final : public MemoryManager {
private:
bool tryDestructSafe();

void dropMemoryPool(const std::string& name);

std::unique_ptr<AllocationListener> listener_;
std::unique_ptr<AllocationListener> blockListener_;

Expand Down