Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNu

// cudf
const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
constexpr bool kCudfEnabledDefault = true;
constexpr bool kCudfEnabledDefault = false;
const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
const std::string kDebugCudfDefault = "false";

Expand Down
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ if(ENABLE_GPU)
cudf/GpuLock.cc
shuffle/VeloxGpuShuffleReader.cc
shuffle/VeloxGpuShuffleWriter.cc
operators/serializer/VeloxGpuColumnarBatchSerializer.cc
utils/GpuBufferBatchResizer.cc
memory/GpuBufferColumnarBatch.cc)
endif()
Expand Down
9 changes: 9 additions & 0 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
#include "operators/writer/VeloxParquetDataSourceABFS.h"
#endif

#ifdef GLUTEN_ENABLE_GPU
#include "operators/serializer/VeloxGpuColumnarBatchSerializer.h"
#endif

using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -307,6 +311,11 @@ std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
auto arrowPool = memoryManager()->defaultArrowMemoryPool();
auto veloxPool = memoryManager()->getLeafMemoryPool();
#ifdef GLUTEN_ENABLE_GPU
if (veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
}
#endif
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
}

Expand Down
27 changes: 21 additions & 6 deletions cpp/velox/memory/GpuBufferColumnarBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ std::shared_ptr<GpuBufferColumnarBatch> GpuBufferColumnarBatch::compose(

std::vector<std::shared_ptr<arrow::Buffer>> returnBuffers;
returnBuffers.reserve(bufferSize);
for (auto size : bufferSizes) {
for (auto i = 0; i < bufferSize; ++i) {
// Defer the null buffer to really contains null.
if (bufferTypes[i] == BufferType::kNull) {
returnBuffers.emplace_back(nullptr);
continue;
}
std::shared_ptr<arrow::Buffer> buffer;
// May optimize to reuse the first batch buffer.
GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool));
GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(bufferSizes[i], pool));
returnBuffers.emplace_back(std::move(buffer));
}

Expand All @@ -123,11 +128,21 @@ std::shared_ptr<GpuBufferColumnarBatch> GpuBufferColumnarBatch::compose(
continue;
}
// Combine the null buffer
// The last byte may still have space to write when nullBitsRemainder != 0.
auto* dst = returnBuffers[bufferIdx]->mutable_data();
if (batch->bufferAt(bufferIdx) == nullptr) {
arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
if (batch->bufferAt(bufferIdx) == nullptr || batch->bufferAt(bufferIdx)->size() == 0) {
if (returnBuffers[bufferIdx] != nullptr) {
auto* dst = returnBuffers[bufferIdx]->mutable_data();
arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
}
} else {
// Need to allocate null buffer.
if (returnBuffers[bufferIdx] == nullptr) {
std::shared_ptr<arrow::Buffer> buffer;
GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(bufferSizes[bufferIdx], pool));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be arrow::bit_util::BytesForBits(bufferSizes[bufferIdx])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returnBuffers[bufferIdx] = buffer;
// Set all the previous rows to not null.
arrow::bit_util::SetBitsTo(buffer->mutable_data(), 0, rowNumber, true);
}
auto* dst = returnBuffers[bufferIdx]->mutable_data();
arrow::internal::CopyBitmap(batch->bufferAt(bufferIdx)->data(), 0, batch->numRows(), dst, rowNumber);
}

Expand Down
11 changes: 4 additions & 7 deletions cpp/velox/operators/plannodes/CudfVectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#pragma once

#include "CudfVectorStream.h"
#include "velox/experimental/cudf/exec/NvtxHelper.h"
#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
#include "velox/experimental/cudf/vector/CudfVector.h"
Expand Down Expand Up @@ -93,8 +93,8 @@ class CudfValueStreamNode final : public facebook::velox::core::PlanNode {
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};

// Extends NvtxHelper to identify it as GPU node, so not add CudfFormVelox operator.
class CudfValueStream : public facebook::velox::exec::SourceOperator, public facebook::velox::cudf_velox::NvtxHelper {
// Extends CudfOperator to identify it as GPU node, so not add CudfFormVelox operator.
class CudfValueStream : public facebook::velox::exec::SourceOperator, public facebook::velox::cudf_velox::CudfOperator {
public:
CudfValueStream(
int32_t operatorId,
Expand All @@ -106,10 +106,7 @@ class CudfValueStream : public facebook::velox::exec::SourceOperator, public fac
operatorId,
valueStreamNode->id(),
valueStreamNode->name().data()),
facebook::velox::cudf_velox::NvtxHelper(
nvtx3::rgb{160, 82, 45}, // Sienna
operatorId,
fmt::format("[{}]", valueStreamNode->id())) {
facebook::velox::cudf_velox::CudfOperator(operatorId, valueStreamNode->id()) {
ResultIterator* itr = valueStreamNode->iterator();
rvStream_ = std::make_unique<CudfVectorStream>(driverCtx, pool(), itr, outputType_);
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace gluten {

class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer {
public:
VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
Expand All @@ -40,7 +40,7 @@ class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {

std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size) override;

private:
protected:
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::unique_ptr<facebook::velox::StreamArena> arena_;
std::unique_ptr<facebook::velox::IterativeVectorSerializer> serializer_;
Expand Down
54 changes: 54 additions & 0 deletions cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "VeloxGpuColumnarBatchSerializer.h"

#include <arrow/buffer.h>

#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
#include "velox/common/memory/Memory.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/arrow/Bridge.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/vector/CudfVector.h"

#include <iostream>

using namespace facebook::velox;

namespace gluten {

VeloxGpuColumnarBatchSerializer::VeloxGpuColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema)
: VeloxColumnarBatchSerializer(arrowPool, veloxPool, cSchema) {
}

std::shared_ptr<ColumnarBatch> VeloxGpuColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) {
auto vb = VeloxColumnarBatchSerializer::deserialize(data, size);
auto stream = cudf_velox::cudfGlobalStreamPool().get_stream();
auto table = cudf_velox::with_arrow::toCudfTable(dynamic_pointer_cast<VeloxColumnarBatch>(vb)->getRowVector(), veloxPool_.get(), stream);
stream.synchronize();
auto vector = std::make_shared<cudf_velox::CudfVector>(
veloxPool_.get(), rowType_, size, std::move(table), stream);
return std::make_shared<VeloxColumnarBatch>(vector, vb->numColumns());
}

} // namespace gluten
42 changes: 42 additions & 0 deletions cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <arrow/c/abi.h>

#include "VeloxColumnarBatchSerializer.h"

#include "memory/ColumnarBatch.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "velox/serializers/PrestoSerializer.h"

namespace gluten {

class VeloxGpuColumnarBatchSerializer final : public VeloxColumnarBatchSerializer {
public:
VeloxGpuColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema);

// Deserialize to cudf table, then the Cudf pipeline accepts CudfVector, we can remove CudfFromveloc operator from the
// velox pipeline input.
std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size) override;
};

} // namespace gluten