Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ if(${BOLT_BUILD_BENCHMARKS})
set(BOLT_BUILD_TEST_UTILS ON)
set(BOLT_BUILD_TESTING OFF)
set(BOLT_ENABLE_EXAMPLES OFF)
set(BOLT_ENABLE_ABFS OFF)
set(BOLT_ENABLE_SUBSTRAIT OFF)
set(BOLT_CODEGEN_SUPPORT OFF)
endif()
Expand Down Expand Up @@ -369,9 +368,7 @@ if(BOLT_ENABLE_ABFS)
if(AZURESDK_ROOT_DIR)
list(APPEND CMAKE_PREFIX_PATH ${AZURESDK_ROOT_DIR})
endif()
# files-datalake is built on blobs
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
find_package(AzureSDK CONFIG REQUIRED)
add_definitions(-DBOLT_ENABLE_ABFS)
endif()

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ compile_db_all:
$(MAKE) _compile_db \
BUILD_TYPE=Release \
BOLT_BUILD_BENCHMARKS="ON" \
CONAN_OPTIONS=" -o bolt/*:spark_compatible=True -o bolt/*:enable_testutil=True -o bolt/*:enable_s3=True -o bolt/*:enable_gcs=True" \
CONAN_OPTIONS=" -o bolt/*:spark_compatible=True -o bolt/*:enable_testutil=True -o bolt/*:enable_s3=True \
-o bolt/*:enable_gcs=True -o bolt/*:enable_abfs=True" \
CONAN_CONFIG=" -c tools.build:skip_test=False"

export_base:
Expand Down
1 change: 1 addition & 0 deletions bolt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ target_link_libraries(bolt_engine PUBLIC
$<$<BOOL:${BOLT_ENABLE_SPARK_COMPATIBLE}>:celeborn-cpp-client::celeborn-cpp-client>
$<$<BOOL:${BOLT_ENABLE_S3}>:aws-sdk-cpp::aws-sdk-cpp>
$<$<BOOL:${BOLT_ENABLE_GCS}>:google-cloud-cpp::storage>
$<$<BOOL:${BOLT_ENABLE_ABFS}>:Azure::azure-storage-blobs>
CURL::libcurl
)

Expand Down
2 changes: 0 additions & 2 deletions bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#include "bolt/common/file/File.h"
#include "bolt/connectors/hive/HiveConfig.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
#include "bolt/core/Config.h"

Expand All @@ -44,7 +43,6 @@
#include "bolt/common/file/File.h"
#include "bolt/connectors/hive/HiveConfig.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
namespace bytedance::bolt::filesystems::abfs {
using namespace Azure::Storage::Blobs;
class AbfsConfig {
Expand Down
8 changes: 7 additions & 1 deletion bolt/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,16 @@ TypePtr ReaderBase::convertType(
"{} converted type can only be set for thrift::Type::(FIXED_LEN_)BYTE_ARRAY.",
thrift::to_string(schemaElement.converted_type));
}
case thrift::ConvertedType::ENUM: {
BOLT_CHECK_EQ(
schemaElement.type,
thrift::Type::BYTE_ARRAY,
"ENUM converted type can only be set for value of thrift::Type::BYTE_ARRAY");
return VARCHAR();
}
case thrift::ConvertedType::MAP:
case thrift::ConvertedType::MAP_KEY_VALUE:
case thrift::ConvertedType::LIST:
case thrift::ConvertedType::ENUM:
case thrift::ConvertedType::TIME_MILLIS:
case thrift::ConvertedType::TIME_MICROS:
case thrift::ConvertedType::BSON:
Expand Down
Binary file added bolt/dwio/parquet/tests/examples/enum_type.parquet
Binary file not shown.
25 changes: 25 additions & 0 deletions bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,31 @@ TEST_F(ParquetReaderTest, readEncryptedParquetWithFilters) {
"encrypted_sample.parquet", rowType, std::move(filters), expected);
}

TEST_F(ParquetReaderTest, testEnumType) {
// enum_type.parquet contains 1 column (ENUM) with 3 rows.
const std::string sample(getExampleFilePath("enum_type.parquet"));

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->numberOfRows(), 3ULL);

auto rowType = reader->typeWithId();
EXPECT_EQ(rowType->type()->kind(), TypeKind::ROW);
EXPECT_EQ(rowType->size(), 1ULL);

EXPECT_EQ(rowType->childAt(0)->type()->kind(), TypeKind::VARCHAR);

auto fileSchema = ROW({"test"}, {VARCHAR()});
auto rowReaderOpts = getReaderOpts(fileSchema);
rowReaderOpts.setScanSpec(makeScanSpec(fileSchema));
auto rowReader = reader->createRowReader(rowReaderOpts);

auto expected =
makeRowVector({makeFlatVector<StringView>({"FOO", "BAR", "FOO"})});

assertReadWithReaderAndExpected(fileSchema, *rowReader, expected, *leafPool_);
}

TEST_F(ParquetReaderTest, readBinaryAsStringFromNation) {
const std::string filename("nation.parquet");
const std::string sample(getExampleFilePath(filename));
Expand Down
60 changes: 60 additions & 0 deletions bolt/functions/lib/SIMDComparisonUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
*/
#pragma once

#include "bolt/expression/EvalCtx.h"
#include "bolt/expression/VectorFunction.h"
#include "bolt/vector/DecodedVector.h"

namespace bytedance::bolt::functions {

Expand Down Expand Up @@ -140,6 +142,10 @@ void applyAutoSimdComparisonInternal(
});
}
}

inline bool isDictEncoding(const VectorPtr& arg) {
return arg->encoding() == VectorEncoding::Simple::DICTIONARY;
}
} // namespace detail

template <
Expand Down Expand Up @@ -238,6 +244,7 @@ template <typename A, typename B, typename Compare, typename... Args>
void applyAutoSimdComparison(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
exec::EvalCtx& context,
VectorPtr& result,
Args... cmpArgs) {
const Compare cmp;
Expand All @@ -258,6 +265,42 @@ void applyAutoSimdComparison(
}
},
result);
} else if (args[0]->isFlatEncoding() && detail::isDictEncoding(args[1])) {
const A* __restrict rawA =
args[0]->asUnchecked<FlatVector<A>>()->template rawValues<A>();
DecodedVector decodedB(*args[1], rows);
const B* __restrict rawB = decodedB.data<B>();
const vector_size_t* __restrict indexB = decodedB.indices();
detail::applyAutoSimdComparisonInternal(
rows,
rawA,
rawB,
[&](const A* __restrict rawA, const B* __restrict rawB, int i) {
if constexpr (sizeof...(cmpArgs) > 0) {
return Compare::apply(rawA[i], rawB[indexB[i]], cmpArgs...);
} else {
return cmp(rawA[i], rawB[indexB[i]]);
}
},
result);
} else if (detail::isDictEncoding(args[0]) && args[1]->isFlatEncoding()) {
DecodedVector decodedA(*args[0], rows);
const A* __restrict rawA = decodedA.data<A>();
const vector_size_t* __restrict indexA = decodedA.indices();
const B* __restrict rawB =
args[1]->asUnchecked<FlatVector<B>>()->template rawValues<B>();
detail::applyAutoSimdComparisonInternal(
rows,
rawA,
rawB,
[&](const A* __restrict rawA, const B* __restrict rawB, int i) {
if constexpr (sizeof...(cmpArgs) > 0) {
return Compare::apply(rawA[indexA[i]], rawB[i], cmpArgs...);
} else {
return cmp(rawA[indexA[i]], rawB[i]);
}
},
result);
} else if (args[0]->isConstantEncoding() && args[1]->isFlatEncoding()) {
const A constA = args[0]->asUnchecked<ConstantVector<A>>()->valueAt(0);
const A* __restrict rawA = &constA;
Expand Down Expand Up @@ -313,4 +356,21 @@ void applyAutoSimdComparison(
BOLT_UNREACHABLE();
}
}

template <typename A, typename B>
bool shouldApplyAutoSimdComparison(
const SelectivityVector& rows,
std::vector<VectorPtr>& args) {
if (rows.end() - rows.begin() > 64) {
if ((args[0]->isFlatEncoding() || args[0]->isConstantEncoding()) &&
(args[1]->isFlatEncoding() || args[1]->isConstantEncoding())) {
return true;
} else if (args[0]->isFlatEncoding() && detail::isDictEncoding(args[1])) {
return true;
} else if (detail::isDictEncoding(args[0]) && args[1]->isFlatEncoding()) {
return true;
}
}
return false;
}
} // namespace bytedance::bolt::functions
23 changes: 11 additions & 12 deletions bolt/functions/sparksql/Comparisons.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ class ComparisonFunction final : public exec::VectorFunction {
const Cmp cmp;
context.ensureWritable(rows, BOOLEAN(), result);
result->clearNulls(rows);
if ((args[0]->isFlatEncoding() || args[0]->isConstantEncoding()) &&
(args[1]->isFlatEncoding() || args[1]->isConstantEncoding())) {
if constexpr (
kind == TypeKind::TINYINT || kind == TypeKind::SMALLINT ||
kind == TypeKind::INTEGER || kind == TypeKind::BIGINT) {
if (rows.isAllSelected()) {
applySimdComparison<T, Cmp>(rows, args, result);
return;
}
}
if (rows.end() - rows.begin() > 64) {
applyAutoSimdComparison<T, T, Cmp>(rows, args, result);
if constexpr (
kind == TypeKind::TINYINT || kind == TypeKind::SMALLINT ||
kind == TypeKind::INTEGER || kind == TypeKind::BIGINT) {
if ((args[0]->isFlatEncoding() || args[0]->isConstantEncoding()) &&
(args[1]->isFlatEncoding() || args[1]->isConstantEncoding()) &&
rows.isAllSelected()) {
applySimdComparison<T, Cmp>(rows, args, result);
return;
}
}
if (shouldApplyAutoSimdComparison<T, T>(rows, args)) {
applyAutoSimdComparison<T, T, Cmp>(rows, args, context, result);
return;
}
auto* flatResult = result->asUnchecked<FlatVector<bool>>();

if (args[0]->isFlatEncoding() && args[1]->isFlatEncoding()) {
Expand Down
6 changes: 2 additions & 4 deletions bolt/functions/sparksql/DecimalCompare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ class DecimalCompareFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const override {
prepareResults(rows, resultType, context, result);
if ((args[0]->isFlatEncoding() || args[0]->isConstantEncoding()) &&
(args[1]->isFlatEncoding() || args[1]->isConstantEncoding()) &&
rows.end() - rows.begin() > 64) {
if (shouldApplyAutoSimdComparison<A, B>(rows, args)) {
applyAutoSimdComparison<A, B, Operation, int8_t, bool>(
rows, args, result, deltaScale_, need256_);
rows, args, context, result, deltaScale_, need256_);
return;
}

Expand Down
22 changes: 20 additions & 2 deletions bolt/functions/sparksql/benchmarks/SIMDCompareBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "bolt/benchmarks/ExpressionBenchmarkBuilder.h"
#include "bolt/functions/sparksql/Register.h"
#include "bolt/vector/fuzzer/VectorFuzzer.h"

using namespace bytedance;

Expand All @@ -55,12 +56,29 @@ int main(int argc, char** argv) {
ExpressionBenchmarkBuilder benchmarkBuilder;
for (auto nullRatio : {0.0, 0.1, 0.9}) {
for (auto& inputType : inputTypes) {
VectorFuzzer::Options opts;
opts.vectorSize = 10000;
opts.nullRatio = nullRatio;
VectorFuzzer fuzzer(opts, benchmarkBuilder.pool());
std::vector<VectorPtr> childrenVectors = {
fuzzer.fuzzDictionary(fuzzer.fuzzFlat(inputType)),
fuzzer.fuzzFlat(inputType)};
benchmarkBuilder
.addBenchmarkSet(
fmt::format(
"{}#{}\%null", inputType->toString(), nullRatio * 100),
"Dict#{}#{}\%null", inputType->toString(), nullRatio * 100),
fuzzer.fuzzRow(
std::move(childrenVectors), {"c0", "c1"}, opts.vectorSize))
.addExpression("equalto", "equalto(c0, c1)")
.addExpression("greaterthan", "greaterthan(c0, c1)")
.addExpression("greaterthanorequal", "greaterthanorequal(c0, c1)")
.withIterations(100);
benchmarkBuilder
.addBenchmarkSet(
fmt::format(
"Flat#{}#{}\%null", inputType->toString(), nullRatio * 100),
ROW({"c0", "c1"}, {inputType, inputType}))
.withFuzzerOptions({.vectorSize = 10000, .nullRatio = nullRatio})
.withFuzzerOptions(opts)
.addExpression("equalto", "equalto(c0, c1)")
.addExpression("greaterthan", "greaterthan(c0, c1)")
.addExpression("greaterthanorequal", "greaterthanorequal(c0, c1)")
Expand Down
47 changes: 41 additions & 6 deletions bolt/functions/sparksql/tests/ComparisonsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "bolt/common/base/tests/GTestUtils.h"
#include "bolt/functions/sparksql/tests/SparkFunctionBaseTest.h"
#include "bolt/vector/DecodedVector.h"

#include <bolt/vector/SimpleVector.h>
namespace bytedance::bolt::functions::sparksql::test {
Expand Down Expand Up @@ -94,6 +95,8 @@ class ComparisonsTest : public SparkFunctionBaseTest {
auto right = rVector->template as<FlatVector<T>>()->rawValues();
auto constVector = fuzzer.fuzzConstant(type);
auto constant = constVector->template as<ConstantVector<T>>()->value();
auto lDictVector = fuzzer.fuzzDictionary(lVector);
auto rDictVector = fuzzer.fuzzDictionary(rVector);
SelectivityVector rows(size);
rows.setValidRange(0, unSelectedRows, false);

Expand All @@ -112,21 +115,53 @@ class ComparisonsTest : public SparkFunctionBaseTest {
// Flat, Const
std::vector<VectorPtr> rConstVectors = {lVector, constVector};
rowVector = fuzzer.fuzzRow(std::move(rConstVectors), {"c0", "c1"}, size);
result =
evaluate<SimpleVector<bool>>("greaterthanorequal(c0, c1)", rowVector);
result = evaluate<SimpleVector<bool>>("equalto(c0, c1)", rowVector);
for (auto i = unSelectedRows; i < size; i++) {
expectedResult[i] = left[i] >= constant;
expectedResult[i] = left[i] == constant;
}
bolt::test::assertEqualVectors(
makeFlatVector<bool>(expectedResult), result, rows);

// Const, Flat
std::vector<VectorPtr> lConstVectors = {constVector, rVector};
rowVector = fuzzer.fuzzRow(std::move(lConstVectors), {"c0", "c1"}, size);
result =
evaluate<SimpleVector<bool>>("greaterthanorequal(c0, c1)", rowVector);
result = evaluate<SimpleVector<bool>>("lessthan(c0, c1)", rowVector);
for (auto i = unSelectedRows; i < size; i++) {
expectedResult[i] = constant < right[i];
}
bolt::test::assertEqualVectors(
makeFlatVector<bool>(expectedResult), result, rows);

// Dict(Flat), Flat
childrenVectors = {lDictVector, rVector};
rowVector = fuzzer.fuzzRow(std::move(childrenVectors), {"c0", "c1"}, size);
result = evaluate<SimpleVector<bool>>("lessthanorequal(c0, c1)", rowVector);
DecodedVector lDecodedVector(*lDictVector);
for (auto i = unSelectedRows; i < size; i++) {
expectedResult[i] = lDecodedVector.valueAt<T>(i) <= right[i];
}
bolt::test::assertEqualVectors(
makeFlatVector<bool>(expectedResult), result, rows);

// Flat, Dict(Flat)
childrenVectors = {lVector, rDictVector};
rowVector = fuzzer.fuzzRow(std::move(childrenVectors), {"c0", "c1"}, size);
result = evaluate<SimpleVector<bool>>("greaterthan(c0, c1)", rowVector);
DecodedVector rDecodedVector(*rDictVector);
for (auto i = unSelectedRows; i < size; i++) {
expectedResult[i] = left[i] > rDecodedVector.valueAt<T>(i);
}
bolt::test::assertEqualVectors(
makeFlatVector<bool>(expectedResult), result, rows);

// Dict(Dict(Flat)), Flat
auto lDictDictVector = fuzzer.fuzzDictionary(lDictVector);
childrenVectors = {lDictDictVector, rVector};
rowVector = fuzzer.fuzzRow(std::move(childrenVectors), {"c0", "c1"}, size);
result = evaluate<SimpleVector<bool>>("lessthanorequal(c0, c1)", rowVector);
DecodedVector decodedVector(*lDictDictVector);
for (auto i = unSelectedRows; i < size; i++) {
expectedResult[i] = constant >= right[i];
expectedResult[i] = decodedVector.valueAt<T>(i) <= right[i];
}
bolt::test::assertEqualVectors(
makeFlatVector<bool>(expectedResult), result, rows);
Expand Down
Loading
Loading