diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 110c741a4b4b..a082a720eadd 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -95,7 +95,7 @@ const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNu // cudf const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf"; -constexpr bool kCudfEnabledDefault = true; +constexpr bool kCudfEnabledDefault = false; const std::string kDebugCudf = "spark.gluten.sql.debug.cudf"; const std::string kDebugCudfDefault = "false"; diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 806b65f881c0..56fab701ee07 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -207,6 +207,7 @@ if(ENABLE_GPU) cudf/GpuLock.cc shuffle/VeloxGpuShuffleReader.cc shuffle/VeloxGpuShuffleWriter.cc + operators/serializer/VeloxGpuColumnarBatchSerializer.cc utils/GpuBufferBatchResizer.cc memory/GpuBufferColumnarBatch.cc) endif() diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 966f2af6af7b..5aaf83d7f48c 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -52,6 +52,10 @@ DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks); #include "operators/writer/VeloxParquetDataSourceABFS.h" #endif +#ifdef GLUTEN_ENABLE_GPU +#include "operators/serializer/VeloxGpuColumnarBatchSerializer.h" +#endif + using namespace facebook; namespace gluten { @@ -307,6 +311,11 @@ std::shared_ptr VeloxRuntime::createShuffleReader( std::unique_ptr VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) { auto arrowPool = memoryManager()->defaultArrowMemoryPool(); auto veloxPool = memoryManager()->getLeafMemoryPool(); +#ifdef GLUTEN_ENABLE_GPU + if (veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { + return std::make_unique(arrowPool, veloxPool, cSchema); + } +#endif return std::make_unique(arrowPool, veloxPool, cSchema); } diff --git a/cpp/velox/memory/GpuBufferColumnarBatch.cc b/cpp/velox/memory/GpuBufferColumnarBatch.cc index 649aadfb796b..f73873bab372 100644 --- a/cpp/velox/memory/GpuBufferColumnarBatch.cc +++ b/cpp/velox/memory/GpuBufferColumnarBatch.cc @@ -105,10 +105,15 @@ std::shared_ptr GpuBufferColumnarBatch::compose( std::vector> returnBuffers; returnBuffers.reserve(bufferSize); - for (auto size : bufferSizes) { + for (auto i = 0; i < bufferSize; ++i) { + // Defer the null buffer to really contains null. + if (bufferTypes[i] == BufferType::kNull) { + returnBuffers.emplace_back(nullptr); + continue; + } std::shared_ptr buffer; // May optimize to reuse the first batch buffer. - GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool)); + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(bufferSizes[i], pool)); returnBuffers.emplace_back(std::move(buffer)); } @@ -123,11 +128,21 @@ std::shared_ptr GpuBufferColumnarBatch::compose( continue; } // Combine the null buffer - // The last byte may still have space to write when nullBitsRemainder != 0. - auto* dst = returnBuffers[bufferIdx]->mutable_data(); - if (batch->bufferAt(bufferIdx) == nullptr) { - arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true); + if (batch->bufferAt(bufferIdx) == nullptr || batch->bufferAt(bufferIdx)->size() == 0) { + if (returnBuffers[bufferIdx] != nullptr) { + auto* dst = returnBuffers[bufferIdx]->mutable_data(); + arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true); + } } else { + // Need to allocate null buffer. + if (returnBuffers[bufferIdx] == nullptr) { + std::shared_ptr buffer; + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(bufferSizes[bufferIdx], pool)); + returnBuffers[bufferIdx] = buffer; + // Set all the previous rows to not null. + arrow::bit_util::SetBitsTo(buffer->mutable_data(), 0, rowNumber, true); + } + auto* dst = returnBuffers[bufferIdx]->mutable_data(); arrow::internal::CopyBitmap(batch->bufferAt(bufferIdx)->data(), 0, batch->numRows(), dst, rowNumber); } diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h b/cpp/velox/operators/plannodes/CudfVectorStream.h index 4707b8789d54..9758d1c35e28 100644 --- a/cpp/velox/operators/plannodes/CudfVectorStream.h +++ b/cpp/velox/operators/plannodes/CudfVectorStream.h @@ -18,7 +18,7 @@ #pragma once #include "CudfVectorStream.h" -#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/exec/CudfOperator.h" #include "velox/experimental/cudf/exec/Utilities.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/vector/CudfVector.h" @@ -93,8 +93,8 @@ class CudfValueStreamNode final : public facebook::velox::core::PlanNode { const std::vector kEmptySources_; }; -// Extends NvtxHelper to identify it as GPU node, so not add CudfFormVelox operator. -class CudfValueStream : public facebook::velox::exec::SourceOperator, public facebook::velox::cudf_velox::NvtxHelper { +// Extends CudfOperator to identify it as GPU node, so not add CudfFormVelox operator. +class CudfValueStream : public facebook::velox::exec::SourceOperator, public facebook::velox::cudf_velox::CudfOperator { public: CudfValueStream( int32_t operatorId, @@ -106,10 +106,7 @@ class CudfValueStream : public facebook::velox::exec::SourceOperator, public fac operatorId, valueStreamNode->id(), valueStreamNode->name().data()), - facebook::velox::cudf_velox::NvtxHelper( - nvtx3::rgb{160, 82, 45}, // Sienna - operatorId, - fmt::format("[{}]", valueStreamNode->id())) { + facebook::velox::cudf_velox::CudfOperator(operatorId, valueStreamNode->id()) { ResultIterator* itr = valueStreamNode->iterator(); rvStream_ = std::make_unique(driverCtx, pool(), itr, outputType_); } diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h index 73fec890cfc7..f58da732810b 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h @@ -25,7 +25,7 @@ namespace gluten { -class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer { +class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer { public: VeloxColumnarBatchSerializer( arrow::MemoryPool* arrowPool, @@ -40,7 +40,7 @@ class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer { std::shared_ptr deserialize(uint8_t* data, int32_t size) override; - private: + protected: std::shared_ptr veloxPool_; std::unique_ptr arena_; std::unique_ptr serializer_; diff --git a/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc new file mode 100644 index 000000000000..b9993df0e778 --- /dev/null +++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VeloxGpuColumnarBatchSerializer.h" + +#include + +#include "memory/ArrowMemory.h" +#include "memory/VeloxColumnarBatch.h" +#include "velox/common/memory/Memory.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/arrow/Bridge.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/vector/CudfVector.h" + +#include + +using namespace facebook::velox; + +namespace gluten { + +VeloxGpuColumnarBatchSerializer::VeloxGpuColumnarBatchSerializer( + arrow::MemoryPool* arrowPool, + std::shared_ptr veloxPool, + struct ArrowSchema* cSchema) + : VeloxColumnarBatchSerializer(arrowPool, veloxPool, cSchema) { +} + +std::shared_ptr VeloxGpuColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) { + auto vb = VeloxColumnarBatchSerializer::deserialize(data, size); + auto stream = cudf_velox::cudfGlobalStreamPool().get_stream(); + auto table = cudf_velox::with_arrow::toCudfTable(dynamic_pointer_cast(vb)->getRowVector(), veloxPool_.get(), stream); + stream.synchronize(); + auto vector = std::make_shared( + veloxPool_.get(), rowType_, size, std::move(table), stream); + return std::make_shared(vector, vb->numColumns()); +} + +} // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h new file mode 100644 index 000000000000..5830b4776fd7 --- /dev/null +++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "VeloxColumnarBatchSerializer.h" + +#include "memory/ColumnarBatch.h" +#include "operators/serializer/ColumnarBatchSerializer.h" +#include "velox/serializers/PrestoSerializer.h" + +namespace gluten { + +class VeloxGpuColumnarBatchSerializer final : public VeloxColumnarBatchSerializer { + public: + VeloxGpuColumnarBatchSerializer( + arrow::MemoryPool* arrowPool, + std::shared_ptr veloxPool, + struct ArrowSchema* cSchema); + + // Deserialize to cudf table, then the Cudf pipeline accepts CudfVector, we can remove CudfFromveloc operator from the + // velox pipeline input. + std::shared_ptr deserialize(uint8_t* data, int32_t size) override; +}; + +} // namespace gluten