From 8285cb6087e7d8d09d61ccc8957e3797fb662886 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 19 Dec 2025 12:00:32 +0000 Subject: [PATCH 1/2] support filesystem dim join --- velox/connectors/filesystem/CMakeLists.txt | 4 +- .../filesystem/FileSystemColumnHandle.cpp | 62 +++ .../filesystem/FileSystemColumnHandle.h | 40 ++ .../filesystem/FileSystemConfig.cpp | 21 +- .../connectors/filesystem/FileSystemConfig.h | 75 ++- .../filesystem/FileSystemConnector.cpp | 78 ++- .../filesystem/FileSystemConnector.h | 19 +- .../filesystem/FileSystemDataSink.cpp | 3 +- .../filesystem/FileSystemDataSink.h | 3 +- .../filesystem/FileSystemIndexSource.cpp | 524 ++++++++++++++++++ .../filesystem/FileSystemIndexSource.h | 198 +++++++ .../filesystem/FileSystemIndexTable.h | 38 ++ .../filesystem/FileSystemIndexTableHandle.h | 165 ++++++ .../tests/FileSystemConnectorTest.cpp | 2 - velox/dwio/text/CMakeLists.txt | 7 +- velox/dwio/text/RegisterTextReader.cpp | 28 + velox/dwio/text/RegisterTextReader.h | 25 + velox/dwio/text/reader/CMakeLists.txt | 16 + velox/dwio/text/reader/TextReader.cpp | 211 +++++++ velox/dwio/text/reader/TextReader.h | 151 +++++ velox/dwio/text/tests/CMakeLists.txt | 1 + velox/dwio/text/tests/reader/CMakeLists.txt | 27 + .../dwio/text/tests/reader/TextReaderTest.cpp | 165 ++++++ velox/experimental/stateful/CMakeLists.txt | 1 + .../stateful/StatefulOperator.cpp | 2 - .../experimental/stateful/StatefulPlanner.cpp | 4 + .../stateful/StreamLookupJoin.cpp | 42 ++ .../experimental/stateful/StreamLookupJoin.h | 34 ++ 28 files changed, 1910 insertions(+), 36 deletions(-) create mode 100644 velox/connectors/filesystem/FileSystemColumnHandle.cpp create mode 100644 velox/connectors/filesystem/FileSystemColumnHandle.h create mode 100644 velox/connectors/filesystem/FileSystemIndexSource.cpp create mode 100644 velox/connectors/filesystem/FileSystemIndexSource.h create mode 100644 velox/connectors/filesystem/FileSystemIndexTable.h create mode 100644 velox/connectors/filesystem/FileSystemIndexTableHandle.h create mode 100644 velox/dwio/text/RegisterTextReader.cpp create mode 100644 velox/dwio/text/RegisterTextReader.h create mode 100644 velox/dwio/text/reader/CMakeLists.txt create mode 100644 velox/dwio/text/reader/TextReader.cpp create mode 100644 velox/dwio/text/reader/TextReader.h create mode 100644 velox/dwio/text/tests/reader/CMakeLists.txt create mode 100644 velox/dwio/text/tests/reader/TextReaderTest.cpp create mode 100644 velox/experimental/stateful/StreamLookupJoin.cpp create mode 100644 velox/experimental/stateful/StreamLookupJoin.h diff --git a/velox/connectors/filesystem/CMakeLists.txt b/velox/connectors/filesystem/CMakeLists.txt index 9de31dce9671..eae7ca04a3ae 100644 --- a/velox/connectors/filesystem/CMakeLists.txt +++ b/velox/connectors/filesystem/CMakeLists.txt @@ -21,7 +21,9 @@ velox_add_library( FileSystemConfig.cpp FileSystemConnector.cpp FileSystemInsertTableHandle.cpp - FileSystemDataSink.cpp) + FileSystemDataSink.cpp + FileSystemIndexSource.cpp + FileSystemColumnHandle.cpp) velox_link_libraries(velox_filesystem_connector velox_common_io velox_connector) diff --git a/velox/connectors/filesystem/FileSystemColumnHandle.cpp b/velox/connectors/filesystem/FileSystemColumnHandle.cpp new file mode 100644 index 000000000000..9a898b32791d --- /dev/null +++ b/velox/connectors/filesystem/FileSystemColumnHandle.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/filesystem/FileSystemColumnHandle.h" +#include + +namespace facebook::velox::connector::filesystem { + +std::string FileSystemColumnHandle::toString() const { + std::string s = hive::HiveColumnHandle::toString(); + boost::algorithm::replace_all( + s, "HiveColumnHandle", "FileSystemColumnHandle"); + return s; +} + +folly::dynamic FileSystemColumnHandle::serialize() const { + folly::dynamic obj = ColumnHandle::serializeBase("FileSystemColumnHandle"); + obj["fileSystemColumnHandleName"] = hive::HiveColumnHandle::name(); + obj["columnType"] = columnTypeName(hive::HiveColumnHandle::columnType()); + obj["dataType"] = hive::HiveColumnHandle::dataType()->serialize(); + folly::dynamic requiredSubfields = folly::dynamic::array; + const std::vector& subFields = + hive::HiveColumnHandle::requiredSubfields(); + for (const auto& subfield : subFields) { + requiredSubfields.push_back(subfield.toString()); + } + obj["requiredSubfields"] = requiredSubfields; + return obj; +} + +ColumnHandlePtr FileSystemColumnHandle::create(const folly::dynamic& obj) { + auto name = obj["fileSystemColumnHandleName"].asString(); + auto columnType = columnTypeFromName(obj["columnType"].asString()); + auto dataType = ISerializable::deserialize(obj["dataType"]); + const auto& arr = obj["requiredSubfields"]; + std::vector requiredSubfields; + requiredSubfields.reserve(arr.size()); + for (auto& s : arr) { + requiredSubfields.emplace_back(s.asString()); + } + return std::make_shared( + name, columnType, dataType, std::move(requiredSubfields)); +} + +void FileSystemColumnHandle::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("FileSystemColumnHandle", FileSystemColumnHandle::create); +} +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemColumnHandle.h b/velox/connectors/filesystem/FileSystemColumnHandle.h new file mode 100644 index 000000000000..4dfda102ff01 --- /dev/null +++ b/velox/connectors/filesystem/FileSystemColumnHandle.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/connectors/hive/TableHandle.h" +#include "velox/type/Subfield.h" + +namespace facebook::velox::connector::filesystem { +class FileSystemColumnHandle : public hive::HiveColumnHandle { + public: + FileSystemColumnHandle( + const std::string& name, + ColumnType columnType, + TypePtr dataType, + std::vector requiredSubfields = {}) + : hive::HiveColumnHandle(name, columnType, dataType, dataType, {}, {}) {} + + std::string toString() const; + + folly::dynamic serialize() const override; + + static ColumnHandlePtr create(const folly::dynamic& obj); + + static void registerSerDe(); +}; +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemConfig.cpp b/velox/connectors/filesystem/FileSystemConfig.cpp index 40599a113c7e..36f334c4dec6 100644 --- a/velox/connectors/filesystem/FileSystemConfig.cpp +++ b/velox/connectors/filesystem/FileSystemConfig.cpp @@ -15,12 +15,12 @@ */ #include "velox/connectors/filesystem/FileSystemConfig.h" -#include +#include "velox/dwio/common/Options.h" namespace facebook::velox::connector::filesystem { template -const T FileSystemWriteConfig::checkAndGetConfigValue( +const T FileSystemConfig::checkAndGetConfigValue( const std::string& configKey, const T& defaultValue) const { std::optional configValue = @@ -102,4 +102,19 @@ const int32_t FileSystemWriteConfig::getFileRollingSize() { } } -} // namespace facebook::velox::connector::filesystem +const std::string FileSystemReadConfig::getFieldDelimiter() { + return checkAndGetConfigValue( + kTextFormatFieldDelimiter, defaultTextFormatFieldDelimiter); +} + +const uint64_t FileSystemReadConfig::getMaxReadRows() { + return checkAndGetConfigValue( + kMaxReadRows, defaultMaxReadRows); +} + +const uint64_t FileSystemReadConfig::getMaxReadBytes() { + return checkAndGetConfigValue( + kMaxReadBytes, defaultMaxReadBytes); +} + +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemConfig.h b/velox/connectors/filesystem/FileSystemConfig.h index 70fc524a2362..97b4bedddcbc 100644 --- a/velox/connectors/filesystem/FileSystemConfig.h +++ b/velox/connectors/filesystem/FileSystemConfig.h @@ -24,13 +24,47 @@ namespace facebook::velox::connector::filesystem { using ConfigPtr = std::shared_ptr; -class FileSystemWriteConfig { +class FileSystemConfig { public: - FileSystemWriteConfig(const ConfigPtr& config) : config_(config) {} + FileSystemConfig(const ConfigPtr& config) : config_(config) {} static constexpr const char* kPath = "path"; - /// The config key fo format static constexpr const char* kFormat = "format"; + + const std::string getPath(); + const dwio::common::FileFormat getFormat(); + + const bool exists(const std::string& configKey) { + return config_ && config_->valueExists(configKey); + } + + const ConfigPtr& config() { + return config_; + } + + template + const std::shared_ptr updateAndGetAllConfigs( + const std::unordered_map& configs) const { + std::unordered_map rawConfigs = + config_->rawConfigsCopy(); + rawConfigs.insert(configs.begin(), configs.end()); + ConfigPtr newConfig = + std::make_shared(std::move(rawConfigs)); + return std::make_shared(newConfig); + } + + protected: + ConfigPtr config_; + + template + const T checkAndGetConfigValue(const std::string& configKey, const T& defaultValue) + const; +}; + +class FileSystemWriteConfig : public FileSystemConfig { + public: + FileSystemWriteConfig(const ConfigPtr& config) : FileSystemConfig(config) {} + static constexpr const char* kFileRollingInterval = "sink.rolling-policy.rollover-interval"; static constexpr const char* kFileRollingSize = @@ -74,6 +108,24 @@ class FileSystemWriteConfig { const bool flushOnWrite() { return true; } +}; + +class FileSystemReadConfig : public FileSystemConfig { + public: + FileSystemReadConfig(const ConfigPtr& config) : FileSystemConfig(config) {} + + static constexpr const char* kTextFormatFieldDelimiter = + "csv.field.delimiter"; + static constexpr const char* kMaxReadRows = "max.read.rows"; + static constexpr const char* kMaxReadBytes = "max.read.bytes"; + + static constexpr const char* defaultTextFormatFieldDelimiter = ","; + static constexpr const uint64_t defaultMaxReadRows = 10000; + static constexpr const uint64_t defaultMaxReadBytes = 1024; + + const std::string getFieldDelimiter(); + const uint64_t getMaxReadRows(); + const uint64_t getMaxReadBytes(); const bool exists(const std::string& configKey) { return config_ && config_->valueExists(configKey); } @@ -82,22 +134,7 @@ class FileSystemWriteConfig { return config_; } - template - const std::shared_ptr updateAndGetAllConfigs( - const std::unordered_map& configs) const { - std::unordered_map rawConfigs = - config_->rawConfigsCopy(); - rawConfigs.insert(configs.begin(), configs.end()); - ConfigPtr newConfig = - std::make_shared(std::move(rawConfigs)); - return std::make_shared(newConfig); - } - private: ConfigPtr config_; - - template - const T checkAndGetConfigValue(const std::string& configKey, const T& defaultValue) - const; }; -} // namespace facebook::velox::connector::filesystem +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemConnector.cpp b/velox/connectors/filesystem/FileSystemConnector.cpp index 56707105a890..ae987ad9c0ea 100644 --- a/velox/connectors/filesystem/FileSystemConnector.cpp +++ b/velox/connectors/filesystem/FileSystemConnector.cpp @@ -13,10 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +#include "velox/common/config/Config.h" #include "velox/connectors/filesystem/FileSystemConnector.h" +#include "velox/connectors/filesystem/FileSystemIndexSource.h" +#include "velox/connectors/filesystem/FileSystemIndexTableHandle.h" #include "velox/connectors/filesystem/FileSystemInsertTableHandle.h" #include "velox/connectors/filesystem/FileSystemDataSink.h" +#include namespace facebook::velox::connector::filesystem { @@ -51,4 +54,75 @@ std::unique_ptr FileSystemConnector::createDataSink( insertTableHandle->parititonIndexes(), insertTableHandle->partitionKeys()); } -} // namespace facebook::velox::connector::filesystem + +core::TypedExprPtr toLookupJoinConditionExpr( + const std::vector>& + joinConditions, + const std::shared_ptr& tableHandle, + const RowTypePtr& inputType) { + if (joinConditions.empty()) { + return nullptr; + } + const auto& keyType = tableHandle->keyType(); + std::vector conditionExprs; + conditionExprs.reserve(joinConditions.size()); + for (const auto& condition : joinConditions) { + auto indexColumnExpr = std::make_shared( + keyType->findChild(condition->key->name()), condition->key->name()); + if (auto inCondition = + std::dynamic_pointer_cast( + condition)) { + conditionExprs.push_back(std::make_shared( + BOOLEAN(), + std::vector{ + inCondition->list, std::move(indexColumnExpr)}, + "contains")); + continue; + } + if (auto betweenCondition = + std::dynamic_pointer_cast( + condition)) { + conditionExprs.push_back(std::make_shared( + BOOLEAN(), + std::vector{ + std::move(indexColumnExpr), + betweenCondition->lower, + betweenCondition->upper}, + "between")); + continue; + } + VELOX_FAIL("Invalid index join condition: {}", condition->toString()); + } + return std::make_shared( + BOOLEAN(), conditionExprs, "and"); +} + +std::shared_ptr FileSystemConnector::createIndexSource( + const RowTypePtr& inputType, + size_t numJoinKeys, + const std::vector>& + joinConditions, + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) { + const std::shared_ptr fsTableHandle = + std::dynamic_pointer_cast(tableHandle); + + std::shared_ptr executor; + if (fsTableHandle->asyncLookup()) { + executor = std::make_shared(1); + } + return std::make_shared( + inputType, + outputType, + numJoinKeys, + toLookupJoinConditionExpr(joinConditions, fsTableHandle, inputType), + fsTableHandle, + connectorQueryCtx, + executor); +} + +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemConnector.h b/velox/connectors/filesystem/FileSystemConnector.h index 6ac9c1015fe8..65d0311b9aba 100644 --- a/velox/connectors/filesystem/FileSystemConnector.h +++ b/velox/connectors/filesystem/FileSystemConnector.h @@ -16,6 +16,7 @@ #pragma once #include "velox/connectors/Connector.h" +#include "velox/connectors/filesystem/FileSystemDataSink.h" namespace facebook::velox::connector::filesystem { @@ -35,6 +36,18 @@ class FileSystemConnector : public Connector { columnHandles, ConnectorQueryCtx* connectorQueryCtx) override; + std::shared_ptr createIndexSource( + const RowTypePtr& inputType, + size_t numJoinKeys, + const std::vector>& + joinConditions, + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) override; + std::unique_ptr createDataSink( RowTypePtr inputType, std::shared_ptr connectorInsertTableHandle, @@ -45,6 +58,10 @@ class FileSystemConnector : public Connector { return false; } + bool supportsIndexLookup() const override { + return true; + } + const std::shared_ptr& connectorConfig() const override { return config_; @@ -80,4 +97,4 @@ class FileSystemConnectorFactory : public ConnectorFactory { } }; -} // namespace facebook::velox::connector::filesystem +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemDataSink.cpp b/velox/connectors/filesystem/FileSystemDataSink.cpp index e4ad78e632e6..8fbc04f96738 100644 --- a/velox/connectors/filesystem/FileSystemDataSink.cpp +++ b/velox/connectors/filesystem/FileSystemDataSink.cpp @@ -15,7 +15,6 @@ */ #include "velox/connectors/filesystem/FileSystemDataSink.h" -#include #include "velox/common/base/Fs.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/Options.h" @@ -601,4 +600,4 @@ const std::pair FsFileNameGenerator::gen() const { return fileNames; } -} // namespace facebook::velox::connector::filesystem +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemDataSink.h b/velox/connectors/filesystem/FileSystemDataSink.h index 62faa5f798f7..6a536b82b4ba 100644 --- a/velox/connectors/filesystem/FileSystemDataSink.h +++ b/velox/connectors/filesystem/FileSystemDataSink.h @@ -261,7 +261,6 @@ class FileSystemDataSink : public DataSink { void write(size_t index, RowVectorPtr input); // Compute the partition id for each row in 'input'. void computePartitionIds(const RowVectorPtr& input); - void splitInputRowsAndEnsureWriters(); const std::unique_ptr createWriter( @@ -316,4 +315,4 @@ class FileSystemDataSink : public DataSink { const std::string& partitionName); }; -} // namespace facebook::velox::connector::filesystem +} // namespace facebook::velox::connector::filesystem \ No newline at end of file diff --git a/velox/connectors/filesystem/FileSystemIndexSource.cpp b/velox/connectors/filesystem/FileSystemIndexSource.cpp new file mode 100644 index 000000000000..75d2944b4511 --- /dev/null +++ b/velox/connectors/filesystem/FileSystemIndexSource.cpp @@ -0,0 +1,524 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/filesystem/FileSystemIndexSource.h" +#include +#include +#include +#include "velox/common/memory/RawVector.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/text/reader/TextReader.h" +#include "velox/exec/IndexLookupJoin.h" +#include "velox/exec/OperatorUtils.h" +#include "velox/exec/VectorHasher.h" +#include "velox/expression/FieldReference.h" + +namespace facebook::velox::connector::filesystem { + +FileSystemIndexSource::FileSystemIndexSource( + const RowTypePtr& inputType, + const RowTypePtr& outputType, + size_t numEqualJoinKeys, + const core::TypedExprPtr& joinConditionExpr, + const std::shared_ptr& tableHandle, + connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr& executor) + : tableHandle_(tableHandle), + config_(std::make_shared( + std::make_shared( + std::move(tableHandle->tableParameters())))), + inputType_(inputType), + outputType_(outputType), + keyType_(tableHandle_->keyType()), + valueType_(tableHandle_->valueType()), + connectorQueryCtx_(connectorQueryCtx), + numEqualJoinKeys_(numEqualJoinKeys), + conditionExprSet_( + joinConditionExpr != nullptr + ? connectorQueryCtx_->expressionEvaluator()->compile( + joinConditionExpr) + : nullptr), + pool_(connectorQueryCtx_->memoryPool()->shared_from_this()), + executor_(executor) { + VELOX_CHECK_LE(outputType_->size(), valueType_->size() + keyType_->size()); + VELOX_CHECK_LE(numEqualJoinKeys_, keyType_->size()); + for (int i = 0; i < numEqualJoinKeys_; ++i) { + VELOX_CHECK( + keyType_->childAt(i)->equivalent(*inputType_->childAt(i)), + "{} vs {}", + keyType_->toString(), + inputType_->toString()); + } + initOutputProjections(); + initConditionProjections(); + initLookupTable(); +} + +void FileSystemIndexSource::initOutputProjections() { + VELOX_CHECK(lookupOutputProjections_.empty()); + lookupOutputProjections_.reserve(outputType_->size()); + for (auto outputChannel = 0; outputChannel < outputType_->size(); + ++outputChannel) { + const auto outputName = outputType_->nameOf(outputChannel); + if (valueType_->containsChild(outputName)) { + const auto tableValueChannel = valueType_->getChildIdx(outputName); + // The hash table layout is: index columns, value columns. + lookupOutputProjections_.emplace_back( + keyType_->size() + tableValueChannel, outputChannel); + continue; + } + const auto tableKeyChannel = keyType_->getChildIdx(outputName); + lookupOutputProjections_.emplace_back(tableKeyChannel, outputChannel); + } + VELOX_CHECK_EQ(lookupOutputProjections_.size(), outputType_->size()); +} + +void FileSystemIndexSource::initConditionProjections() { + if (conditionExprSet_ == nullptr) { + return; + } + std::vector names; + std::vector types; + column_index_t outputChannel{0}; + for (const auto& field : conditionExprSet_->distinctFields()) { + names.push_back(field->name()); + types.push_back(field->type()); + if (inputType_->getChildIdxIfExists(field->name()).has_value()) { + conditionInputProjections_.emplace_back( + inputType_->getChildIdx(field->name()), outputChannel++); + continue; + } + conditionTableProjections_.emplace_back( + keyType_->getChildIdx(field->name()), outputChannel++); + } + conditionInputType_ = ROW(std::move(names), std::move(types)); +} + +std::shared_ptr +FileSystemIndexSource::lookup(const LookupRequest& request) { + checkNotFailed(); + VELOX_CHECK(lookupTable_ != nullptr); + const auto numInputRows = request.input->size(); + auto& hashTable = lookupTable_->table; + auto lookup = + std::make_unique(hashTable->hashers(), pool_.get()); + SelectivityVector activeRows(numInputRows); + VELOX_CHECK(activeRows.isAllSelected()); + hashTable->prepareForJoinProbe( + *lookup, request.input, activeRows, /*decodeAndRemoveNulls=*/true); + lookup->hits.resize(numInputRows); + std::fill(lookup->hits.data(), lookup->hits.data() + numInputRows, nullptr); + if (!lookup->rows.empty()) { + hashTable->joinProbe(*lookup); + } + // Update lookup rows to include all input rows as it might be used by left + // join. + auto& rows = lookup->rows; + rows.resize(request.input->size()); + std::iota(rows.begin(), rows.end(), 0); + return std::make_shared( + this->shared_from_this(), + request, + std::move(lookup), + tableHandle_->asyncLookup() ? executor_.get() : nullptr); +} + +void FileSystemIndexSource::recordCpuTiming(const CpuWallTiming& timing) { + VELOX_CHECK_EQ(timing.count, 1); + std::lock_guard l(mutex_); + if (timing.wallNanos != 0) { + exec::addOperatorRuntimeStats( + exec::IndexLookupJoin::kConnectorLookupWallTime, + RuntimeCounter(timing.wallNanos, RuntimeCounter::Unit::kNanos), + runtimeStats_); + exec::addOperatorRuntimeStats( + exec::IndexLookupJoin::kClientLookupWaitWallTime, + RuntimeCounter(timing.wallNanos, RuntimeCounter::Unit::kNanos), + runtimeStats_); + } + if (timing.cpuNanos != 0) { + exec::addOperatorRuntimeStats( + exec::IndexLookupJoin::kConnectorResultPrepareTime, + RuntimeCounter(timing.cpuNanos, RuntimeCounter::Unit::kNanos), + runtimeStats_); + } +} + +void FileSystemIndexSource::checkNotFailed() { + if (!error_.empty()) { + VELOX_FAIL("TestIndexSource failed: {}", error_); + } +} + +std::unordered_map +FileSystemIndexSource::runtimeStats() { + std::lock_guard l(mutex_); + return runtimeStats_; +} + +const std::shared_ptr +FileSystemIndexSource::createIndexTable( + int numEqualJoinKeys, + const RowVectorPtr& keyData, + const RowVectorPtr& valueData) { + const auto keyType = + std::dynamic_pointer_cast(keyData->type()); + VELOX_CHECK_GE(keyType->size(), 1); + VELOX_CHECK_GE(keyType->size(), numEqualJoinKeys); + auto valueType = std::dynamic_pointer_cast(valueData->type()); + VELOX_CHECK_GE(valueType->size(), 1); + const auto numRows = keyData->size(); + VELOX_CHECK_EQ(numRows, valueData->size()); + + std::vector> hashers; + hashers.reserve(numEqualJoinKeys); + std::vector keyVectors; + keyVectors.reserve(numEqualJoinKeys); + for (auto i = 0; i < numEqualJoinKeys; ++i) { + hashers.push_back( + std::make_unique(keyType->childAt(i), i)); + keyVectors.push_back(keyData->childAt(i)); + } + + std::vector dependentTypes; + std::vector dependentVectors; + for (int i = numEqualJoinKeys; i < keyType->size(); ++i) { + dependentTypes.push_back(keyType->childAt(i)); + dependentVectors.push_back(keyData->childAt(i)); + } + for (int i = 0; i < valueType->size(); ++i) { + dependentTypes.push_back(valueType->childAt(i)); + dependentVectors.push_back(valueData->childAt(i)); + } + + // Create the table. + auto table = exec::HashTable::createForJoin( + std::move(hashers), + /*dependentTypes=*/dependentTypes, + /*allowDuplicates=*/true, + /*hasProbedFlag=*/false, + /*minTableSizeForParallelJoinBuild=*/1, + pool_.get()); + + // Insert data into the row container. + auto* rowContainer = table->rows(); + std::vector decodedVectors; + for (auto& vector : keyData->children()) { + decodedVectors.emplace_back(*vector); + } + for (auto& vector : valueData->children()) { + decodedVectors.emplace_back(*vector); + } + + for (auto row = 0; row < numRows; ++row) { + auto* newRow = rowContainer->newRow(); + + for (auto col = 0; col < decodedVectors.size(); ++col) { + rowContainer->store(decodedVectors[col], row, newRow, col); + } + } + + // Build the table index. + table->prepareJoinTable( + {}, exec::BaseHashTable::kNoSpillInputStartPartitionBit); + return std::make_shared( + std::move(keyType), std::move(valueType), std::move(table)); +} + +void FileSystemIndexSource::initLookupTable() { + VELOX_CHECK(config_ != nullptr); + auto fs = filesystems::getFileSystem(config_->getPath(), config_->config()); + VELOX_CHECK(fs != nullptr); + std::shared_ptr readFile = fs->openFileForRead(config_->getPath()); + VELOX_CHECK(readFile != nullptr); + std::unique_ptr input = + std::make_unique(readFile, *pool_); + + /// current only support text format + text::RowReaderOptions rowReaderOptions( + pool_.get(), + config_->getFieldDelimiter().data(), + config_->getMaxReadRows(), + config_->getMaxReadBytes()); + rowReaderOptions.setFileFormat(dwio::common::FileFormat::TEXT); + rowReaderOptions.setFileSchema(tableHandle_->tableSchema()); + + auto readerFactory = dwio::common::getReaderFactory(config_->getFormat()); + auto textReader = + readerFactory->createReader(std::move(input), rowReaderOptions); + VELOX_CHECK(textReader != nullptr); + auto textRowReader = textReader->createRowReader(rowReaderOptions); + VELOX_CHECK(textRowReader != nullptr); + + RowVectorPtr rows = + RowVector::createEmpty(tableHandle_->tableSchema(), pool_.get()); + VectorPtr t = + RowVector::createEmpty(tableHandle_->tableSchema(), pool_.get()); + while (textRowReader->next(config_->getMaxReadRows(), t, nullptr) != 0) { + RowVectorPtr r = std::dynamic_pointer_cast(t); + VELOX_CHECK(r->childrenSize() == rows->childrenSize()); + rows->append(r.get()); + t->prepareForReuse(); + } + if (rows != nullptr) { + VELOX_CHECK(keyType_ != nullptr); + VELOX_CHECK(valueType_ != nullptr); + std::vector keys; + std::vector values; + for (const auto& name : keyType_->names()) { + keys.emplace_back(rows->childAt(name)); + } + for (const auto& name : valueType_->names()) { + values.emplace_back(rows->childAt(name)); + } + lookupTable_ = createIndexTable( + numEqualJoinKeys_, + std::make_shared( + rows->pool(), keyType_, nullptr, rows->size(), keys), + std::make_shared( + rows->pool(), valueType_, nullptr, rows->size(), values)); + } +} + +FileSystemIndexSource::ResultIterator::ResultIterator( + std::shared_ptr source, + const LookupRequest& request, + std::unique_ptr lookupResult, + folly::Executor* executor) + : source_(std::move(source)), + request_(request), + lookupResult_(std::move(lookupResult)), + executor_(executor) { + // Initialize the lookup result iterator. + lookupResultIter_ = std::make_unique( + std::vector{}, 0, /*estimatedRowSize=*/1); + lookupResultIter_->reset(*lookupResult_); +} + +std::optional> +FileSystemIndexSource::ResultIterator::next( + vector_size_t size, + ContinueFuture& future) { + source_->checkNotFailed(); + + if (hasPendingRequest_.exchange(true)) { + VELOX_FAIL("Only one pending request is allowed at a time"); + } + + if (executor_ && !asyncResult_.has_value()) { + asyncLookup(size, future); + return std::nullopt; + } + + SCOPE_EXIT { + hasPendingRequest_ = false; + }; + if (asyncResult_.has_value()) { + VELOX_CHECK_NOT_NULL(executor_); + auto result = std::move(asyncResult_.value()); + asyncResult_.reset(); + return result; + } + return syncLookup(size); +} + +void extractColumns( + exec::BaseHashTable* table, + folly::Range rows, + folly::Range projections, + memory::MemoryPool* pool, + const std::vector& resultTypes, + std::vector& resultVectors) { + VELOX_CHECK_EQ(resultTypes.size(), resultVectors.size()); + for (auto projection : projections) { + const auto resultChannel = projection.outputChannel; + VELOX_CHECK_LT(resultChannel, resultVectors.size()); + auto& child = resultVectors[resultChannel]; + // TODO: Consider reuse of complex types. + if (!child || !BaseVector::isVectorWritable(child) || + !child->isFlatEncoding()) { + child = BaseVector::create(resultTypes[resultChannel], rows.size(), pool); + } + child->resize(rows.size()); + table->extractColumn(rows, projection.inputChannel, child); + } +} + +void FileSystemIndexSource::ResultIterator::extractLookupColumns( + folly::Range rows, + RowVectorPtr& result) { + if (result == nullptr) { + result = BaseVector::create( + source_->outputType(), rows.size(), source_->pool()); + } else { + VectorPtr output = std::move(result); + BaseVector::prepareForReuse(output, rows.size()); + result = std::static_pointer_cast(output); + } + VELOX_CHECK_EQ(result->size(), rows.size()); + extractColumns( + source_->indexTable()->table.get(), + rows, + source_->outputProjections(), + source_->pool_.get(), + source_->outputType_->children(), + lookupOutput_->children()); +} + +void FileSystemIndexSource::ResultIterator::asyncLookup( + vector_size_t size, + ContinueFuture& future) { + VELOX_CHECK_NOT_NULL(executor_); + VELOX_CHECK(!asyncResult_.has_value()); + VELOX_CHECK(hasPendingRequest_); + auto [lookupPromise, lookupFuture] = + makeVeloxContinuePromiseContract("ResultIterator::asyncLookup"); + future = std::move(lookupFuture); + auto asyncPromise = + std::make_shared(std::move(lookupPromise)); + executor_->add([this, size, promise = std::move(asyncPromise)]() mutable { + VELOX_CHECK(!asyncResult_.has_value()); + VELOX_CHECK(hasPendingRequest_); + SCOPE_EXIT { + hasPendingRequest_ = false; + promise->setValue(); + }; + asyncResult_ = syncLookup(size); + }); +} + +std::unique_ptr +FileSystemIndexSource::ResultIterator::syncLookup(vector_size_t size) { + VELOX_CHECK(hasPendingRequest_); + if (lookupResultIter_->atEnd()) { + return nullptr; + } + + CpuWallTiming timing; + SCOPE_EXIT { + source_->recordCpuTiming(timing); + }; + CpuWallTimer timer{timing}; + try { + initBuffer(size, outputRowMapping_, rawOutputRowMapping_); + initBuffer(size, inputRowMapping_, rawInputRowMapping_); + auto numOut = source_->indexTable()->table->listJoinResults( + *lookupResultIter_, + /*includeMisses=*/true, + folly::Range(rawInputRowMapping_, size), + folly::Range(rawOutputRowMapping_, size), + // TODO: support max bytes output later. + /*maxBytes=*/std::numeric_limits::max()); + outputRowMapping_->setSize(numOut * sizeof(char*)); + inputRowMapping_->setSize(numOut * sizeof(vector_size_t)); + + if (numOut == 0) { + VELOX_CHECK(lookupResultIter_->atEnd()); + return nullptr; + } + + evalJoinConditions(); + + initBuffer(numOut, inputHitIndices_, rawInputHitIndices_); + auto numHits{0}; + for (auto i = 0; i < numOut; ++i) { + if (rawOutputRowMapping_[i] == nullptr) { + continue; + } + VELOX_CHECK_LE(numHits, i); + rawOutputRowMapping_[numHits] = rawOutputRowMapping_[i]; + rawInputHitIndices_[numHits] = rawInputRowMapping_[i]; + if (numHits > 0) { + // Make sure the input hit indices are in ascending order. + VELOX_CHECK_GE( + rawInputHitIndices_[numHits], rawInputHitIndices_[numHits - 1]); + } + ++numHits; + } + outputRowMapping_->setSize(numHits * sizeof(char*)); + inputHitIndices_->setSize(numHits * sizeof(vector_size_t)); + extractLookupColumns( + folly::Range(rawOutputRowMapping_, numHits), + lookupOutput_); + VELOX_CHECK_EQ(lookupOutput_->size(), numHits); + VELOX_CHECK_EQ(inputHitIndices_->size() / sizeof(vector_size_t), numHits); + if (lookupOutput_->size() == 0) { + return nullptr; + } else { + return std::make_unique(inputHitIndices_, lookupOutput_); + } + } catch (const std::exception& e) { + VELOX_CHECK(source_->error_.empty()); + source_->error_ = e.what(); + return nullptr; + } +} + +void FileSystemIndexSource::ResultIterator::evalJoinConditions() { + if (source_->conditionExprSet_ == nullptr) { + return; + } + std::lock_guard l(source_->mutex_); + const auto conditionInput = createConditionInput(); + source_->connectorQueryCtx_->expressionEvaluator()->evaluate( + source_->conditionExprSet_.get(), + source_->conditionFilterInputRows_, + *conditionInput, + source_->conditionFilterResult_); + source_->decodedConditionFilterResult_.decode( + *source_->conditionFilterResult_, source_->conditionFilterInputRows_); + + const auto numRows = outputRowMapping_->size() / sizeof(char*); + for (auto row = 0; row < numRows; ++row) { + if (!joinConditionPassed(row)) { + rawOutputRowMapping_[row] = nullptr; + } + } +} + +RowVectorPtr FileSystemIndexSource::ResultIterator::createConditionInput() { + VELOX_CHECK_EQ( + inputRowMapping_->size() / sizeof(vector_size_t), + outputRowMapping_->size() / sizeof(char*)); + const auto numRows = outputRowMapping_->size() / sizeof(char*); + source_->conditionFilterInputRows_.resize(numRows); + std::vector filterColumns(source_->conditionInputType_->size()); + for (const auto& projection : source_->conditionInputProjections_) { + request_.input->childAt(projection.inputChannel)->loadedVector(); + filterColumns[projection.outputChannel] = exec::wrapChild( + numRows, + inputRowMapping_, + request_.input->childAt(projection.inputChannel)); + } + + extractColumns( + source_->indexTable()->table.get(), + folly::Range(rawOutputRowMapping_, numRows), + source_->conditionTableProjections_, + source_->pool_.get(), + source_->conditionInputType_->children(), + filterColumns); + + return std::make_shared( + source_->pool_.get(), + source_->conditionInputType_, + nullptr, + numRows, + std::move(filterColumns)); +} + +} // namespace facebook::velox::connector::filesystem diff --git a/velox/connectors/filesystem/FileSystemIndexSource.h b/velox/connectors/filesystem/FileSystemIndexSource.h new file mode 100644 index 000000000000..e041ca33679d --- /dev/null +++ b/velox/connectors/filesystem/FileSystemIndexSource.h @@ -0,0 +1,198 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/connectors/filesystem/FileSystemConfig.h" +#include "velox/connectors/filesystem/FileSystemIndexTable.h" +#include "velox/connectors/filesystem/FileSystemIndexTableHandle.h" +#include "velox/dwio/common/Reader.h" +#include "velox/exec/HashTable.h" +#include "velox/exec/Operator.h" +#include "velox/expression/Expr.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector::filesystem { + +class FileSystemIndexSource + : public connector::IndexSource, + public std::enable_shared_from_this { + public: + FileSystemIndexSource( + const RowTypePtr& inputType, + const RowTypePtr& outputType, + size_t numEqualJoinKeys, + const core::TypedExprPtr& joinConditionExpr, + const std::shared_ptr& tableHandle, + connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr& executor); + + std::shared_ptr lookup( + const LookupRequest& request) override; + + std::unordered_map runtimeStats() override; + + memory::MemoryPool* pool() const { + return pool_.get(); + } + + const std::shared_ptr& indexTable() const { + return lookupTable_; + } + + const RowTypePtr& outputType() const { + return outputType_; + } + + const std::vector& outputProjections() const { + return lookupOutputProjections_; + } + + class ResultIterator : public LookupResultIterator { + public: + ResultIterator( + std::shared_ptr source, + const LookupRequest& request, + std::unique_ptr lookupResult, + folly::Executor* executor); + + std::optional> next( + vector_size_t size, + ContinueFuture& future) override; + + private: + // Initializes the buffer used to store row pointers or indices for output + // match result processing. + template + void initBuffer(vector_size_t size, BufferPtr& buffer, T*& rawBuffer) { + if (!buffer || !buffer->unique() || + buffer->capacity() < sizeof(T) * size) { + buffer = AlignedBuffer::allocate(size, source_->pool_.get(), T()); + } + rawBuffer = buffer->asMutable(); + } + + void evalJoinConditions(); + + // Check if a given equality matched 'row' has passed join conditions. + inline bool joinConditionPassed(vector_size_t row) const { + return source_->conditionFilterInputRows_.isValid(row) && + !source_->decodedConditionFilterResult_.isNullAt(row) && + source_->decodedConditionFilterResult_.valueAt(row); + } + + // Creates input vector for join condition evaluation. + RowVectorPtr createConditionInput(); + + // Extracts the lookup result columns from the index table and return in + // 'result'. + void extractLookupColumns( + folly::Range rows, + RowVectorPtr& result); + + // Inokved to trigger async lookup using background executor and return the + // 'future'. + void asyncLookup(vector_size_t size, ContinueFuture& future); + + // Synchronously lookup the index table and return up to 'size' number of + // output rows in result. + std::unique_ptr syncLookup(vector_size_t size); + + const std::shared_ptr source_; + const LookupRequest request_; + const std::unique_ptr lookupResult_; + folly::Executor* const executor_{nullptr}; + + std::atomic_bool hasPendingRequest_{false}; + std::unique_ptr lookupResultIter_; + std::optional> asyncResult_; + + // The reusable buffers for lookup result processing. + // The input row number in lookup request for each matched result which is + // paired with 'outputRowMapping_' to indicate if a given input row has + // match or not. If the corresponding output row pointer in + // 'outputRowMapping_' is null, then there is no match for the given input + // row pointed by 'inputRowMapping_'. + BufferPtr inputRowMapping_; + vector_size_t* rawInputRowMapping_{nullptr}; + // Points to the matched row pointer in 'indexTable_' for each input row. If + // there is a miss for a given input, then this is set to null. + BufferPtr outputRowMapping_; + char** rawOutputRowMapping_{nullptr}; + // The input row number in request for each output row in the returned + // lookup result. Any gap in the input row numbers means the corresponding + // input rows that has no matches in the index table. + BufferPtr inputHitIndices_; + vector_size_t* rawInputHitIndices_{nullptr}; + + RowVectorPtr lookupOutput_; + }; + + private: + // Invoked to check if this source has already encountered async lookup error, + // and throws if it has. + void checkNotFailed(); + + // Initialize the output projections for lookup result processing. + void initOutputProjections(); + + // Initialize the condition filter input type and projections if configured. + void initConditionProjections(); + + // Initialize the lookup table, load the data into table. + void initLookupTable(); + + void recordCpuTiming(const CpuWallTiming& timing); + + const std::shared_ptr createIndexTable( + int numEqualJoinKeys, + const RowVectorPtr& keyData, + const RowVectorPtr& valueData); + + std::shared_ptr lookupTable_; + const std::shared_ptr tableHandle_; + const std::shared_ptr config_; + const RowTypePtr inputType_; + const RowTypePtr outputType_; + const RowTypePtr keyType_; + const RowTypePtr valueType_; + connector::ConnectorQueryCtx* const connectorQueryCtx_; + const size_t numEqualJoinKeys_; + const std::unique_ptr conditionExprSet_; + const std::shared_ptr pool_; + std::shared_ptr executor_; + + mutable std::mutex mutex_; + + // Join condition filter input type. + RowTypePtr conditionInputType_; + + // If not empty, set to the first encountered async error. + std::string error_; + + // Reusable memory for join condition filter evaluation. + VectorPtr conditionFilterResult_; + DecodedVector decodedConditionFilterResult_; + SelectivityVector conditionFilterInputRows_; + // Column projections for join condition input and lookup output. + std::vector conditionInputProjections_; + std::vector conditionTableProjections_; + std::vector lookupOutputProjections_; + std::unordered_map runtimeStats_; +}; +} // namespace facebook::velox::connector::filesystem diff --git a/velox/connectors/filesystem/FileSystemIndexTable.h b/velox/connectors/filesystem/FileSystemIndexTable.h new file mode 100644 index 000000000000..ce44ea2374f0 --- /dev/null +++ b/velox/connectors/filesystem/FileSystemIndexTable.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/HashTable.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector::filesystem { + +struct FileSystemIndexTable { + RowTypePtr keyType; + RowTypePtr dataType; + std::shared_ptr table; + + FileSystemIndexTable( + RowTypePtr _keyType, + RowTypePtr _dataType, + std::shared_ptr _table) + : keyType(std::move(_keyType)), + dataType(std::move(_dataType)), + table(std::move(_table)) {} +}; + +} // namespace facebook::velox::connector::filesystem diff --git a/velox/connectors/filesystem/FileSystemIndexTableHandle.h b/velox/connectors/filesystem/FileSystemIndexTableHandle.h new file mode 100644 index 000000000000..3f2f8bfb817f --- /dev/null +++ b/velox/connectors/filesystem/FileSystemIndexTableHandle.h @@ -0,0 +1,165 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/connectors/Connector.h" + +namespace facebook::velox::connector::filesystem { + +class FileSystemIndexTableHandle : public connector::ConnectorTableHandle { + public: + FileSystemIndexTableHandle( + std::string connectorId, + const std::string& tableName, + const RowTypePtr& tableSchema, + const std::vector& keyFields, + bool asyncLookup = false, + const std::unordered_map& tableParameters = {}) + : ConnectorTableHandle(std::move(connectorId)), + tableName_(tableName), + tableSchema_(tableSchema), + keyFields_(keyFields), + asyncLookup_(asyncLookup), + tableParameters_(tableParameters) {} + + ~FileSystemIndexTableHandle() override = default; + + std::string toString() const override { + return fmt::format( + "IndexTableHandle: tableName: {}, tableSchema: {}, asyncLookup: {}", + tableName_, + tableSchema_->toString(), + asyncLookup_); + } + + const std::string& name() const override { + return tableName_; + } + + const RowTypePtr keyType() { + std::vector keyNames; + std::vector keyTypes; + const std::vector& fieldNames = tableSchema_->names(); + for (size_t i = 0; i < keyFields_.size(); ++i) { + keyNames.emplace_back(fieldNames[i]); + keyTypes.emplace_back(tableSchema_->childAt(i)); + } + return std::make_shared( + std::move(keyNames), std::move(keyTypes)); + } + + const RowTypePtr valueType() { + std::vector valueNames; + std::vector valueTypes; + const std::vector& fieldNames = tableSchema_->names(); + for (int32_t i = 0; i < tableSchema_->children().size(); ++i) { + if (std::find(keyFields_.begin(), keyFields_.end(), i) == + keyFields_.end()) { + valueNames.emplace_back(fieldNames[i]); + valueTypes.emplace_back(tableSchema_->childAt(i)); + } + } + return std::make_shared( + std::move(valueNames), std::move(valueTypes)); + } + + const RowTypePtr tableSchema() { + return tableSchema_; + } + + std::unordered_map& tableParameters() { + return tableParameters_; + } + + const std::vector keyFields() { + return keyFields_; + } + + bool supportsIndexLookup() const override { + return true; + } + + folly::dynamic serialize() const override { + folly::dynamic obj = folly::dynamic::object; + obj["tableName"] = name(); + obj["connectorId"] = connectorId(); + obj["asyncLookup"] = asyncLookup_; + if (tableSchema_) { + obj["tableSchema"] = tableSchema_->serialize(); + } + folly::dynamic keyFieldsArray = folly::dynamic::array; + for (const auto& keyField : keyFields_) { + keyFieldsArray.push_back(keyField); + } + obj["keyFields"] = keyFieldsArray; + folly::dynamic tableParameters = folly::dynamic::object; + for (const auto& param : tableParameters_) { + tableParameters[param.first] = param.second; + } + obj["tableParameters"] = tableParameters; + return obj; + } + + static std::shared_ptr create( + const folly::dynamic& obj, + void* context) { + std::string connectorId = obj["connectorId"].asString(); + std::string tableName = obj["tableName"].asString(); + bool asyncLookup = obj["asyncLookup"].asBool(); + std::vector keyFields; + const auto keyFieldsArray = obj["keyFields"]; + for (const auto& item : keyFieldsArray) { + keyFields.emplace_back(item.asInt()); + } + RowTypePtr tableSchema; + if (auto it = obj.find("tableSchema"); it != obj.items().end()) { + tableSchema = ISerializable::deserialize(it->second, context); + } + std::unordered_map tableParameters{}; + const auto& tableParametersObj = obj["tableParameters"]; + for (const auto& key : tableParametersObj.keys()) { + const auto& value = tableParametersObj[key]; + tableParameters.emplace(key.asString(), value.asString()); + } + return std::make_shared( + connectorId, + tableName, + tableSchema, + keyFields, + asyncLookup, + tableParameters); + } + + static void registerSerDe() { + auto& registry = DeserializationWithContextRegistryForSharedPtr(); + registry.Register("FileSystemIndexTableHandle", create); + } + + /// If true, we returns the lookup result asynchronously for testing purpose. + bool asyncLookup() const { + return asyncLookup_; + } + + private: + const std::string tableName_; + const RowTypePtr tableSchema_; + const std::vector keyFields_; + const bool asyncLookup_; + std::unordered_map tableParameters_; +}; + +} // namespace facebook::velox::connector::filesystem diff --git a/velox/connectors/filesystem/tests/FileSystemConnectorTest.cpp b/velox/connectors/filesystem/tests/FileSystemConnectorTest.cpp index 203ef9b58fc9..808c3bfe1e70 100644 --- a/velox/connectors/filesystem/tests/FileSystemConnectorTest.cpp +++ b/velox/connectors/filesystem/tests/FileSystemConnectorTest.cpp @@ -40,8 +40,6 @@ TEST_F(FileSystemConnectorTest, testConfig) { writeConfig->exists(connector::filesystem::FileSystemWriteConfig::kPath)); ASSERT_TRUE(writeConfig->exists( connector::filesystem::FileSystemWriteConfig::kFormat)); - ASSERT_TRUE(writeConfig->exists( - connector::filesystem::FileSystemWriteConfig::kTaskId)); } TEST_F(FileSystemConnectorTest, testWriteNonPartitionedTable) { diff --git a/velox/dwio/text/CMakeLists.txt b/velox/dwio/text/CMakeLists.txt index 844a12ffd601..d2e8bc725cf2 100644 --- a/velox/dwio/text/CMakeLists.txt +++ b/velox/dwio/text/CMakeLists.txt @@ -17,7 +17,10 @@ if(${VELOX_BUILD_TESTING}) endif() add_subdirectory(writer) +add_subdirectory(reader) -velox_add_library(velox_dwio_text_writer_register RegisterTextWriter.cpp) +velox_add_library(velox_dwio_text_reader_writer_register RegisterTextReader.cpp + RegisterTextWriter.cpp) -velox_link_libraries(velox_dwio_text_writer_register velox_dwio_text_writer) +velox_link_libraries(velox_dwio_text_reader_writer_register + velox_dwio_text_reader_writer) \ No newline at end of file diff --git a/velox/dwio/text/RegisterTextReader.cpp b/velox/dwio/text/RegisterTextReader.cpp new file mode 100644 index 000000000000..2582a7335a6e --- /dev/null +++ b/velox/dwio/text/RegisterTextReader.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/dwio/text/reader/TextReader.h" + +namespace facebook::velox::text { + +void registerTextReaderFactory() { + dwio::common::registerReaderFactory(std::make_shared()); +} + +void unregisterTextReaderFactory() { + dwio::common::unregisterReaderFactory(dwio::common::FileFormat::TEXT); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/RegisterTextReader.h b/velox/dwio/text/RegisterTextReader.h new file mode 100644 index 000000000000..a2e1700db61e --- /dev/null +++ b/velox/dwio/text/RegisterTextReader.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::text { + +void registerTextReaderFactory(); + +void unregisterTextReaderFactory(); + +} // namespace facebook::velox::text \ No newline at end of file diff --git a/velox/dwio/text/reader/CMakeLists.txt b/velox/dwio/text/reader/CMakeLists.txt new file mode 100644 index 000000000000..6d4d35609632 --- /dev/null +++ b/velox/dwio/text/reader/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +velox_add_library(velox_dwio_text_reader TextReader.cpp) + +velox_link_libraries(velox_dwio_text_reader velox_dwio_common fmt::fmt) \ No newline at end of file diff --git a/velox/dwio/text/reader/TextReader.cpp b/velox/dwio/text/reader/TextReader.cpp new file mode 100644 index 000000000000..22b96321b462 --- /dev/null +++ b/velox/dwio/text/reader/TextReader.cpp @@ -0,0 +1,211 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/reader/TextReader.h" +#include "velox/dwio/common/MetricsLog.h" +#include "velox/type/StringView.h" +#include "velox/type/Timestamp.h" +#include "velox/type/TimestampConversion.h" + +#include +#include +#include + +namespace facebook::velox::text { + +TextRowReader::TextRowReader( + const RowTypePtr& schema, + const std::shared_ptr& fileInput, + const RowReaderOptions& options) + : schema_(schema), + options_(options), + fileInput_(fileInput), + fileSize_(fileInput_->getReadFile()->size()) {} + +uint64_t TextRowReader::next( + uint64_t maxRowsToRead, + velox::VectorPtr& result, + const dwio::common::Mutation*) { + if (totalReadBytes_ >= fileSize_) { + readFinished_ = true; + readRows_ = 0; + return 0; + } + VELOX_CHECK(fileInput_ != nullptr); + size_t sizeToRead = options_.maxReadSize(); + if (fileSize_ - totalReadBytes_ < sizeToRead) { + sizeToRead = fileSize_ - totalReadBytes_; + } + std::vector dataToRead(sizeToRead); + fileInput_->read( + dataToRead.data(), + sizeToRead, + totalReadBytes_, + dwio::common::MetricsLog::MetricsType::FILE); + std::string_view s(dataToRead.data(), sizeToRead); + const size_t lastLineDelimiterPos = s.rfind(lineDelimiter_); + if (lastLineDelimiterPos != std::string::npos) { + s = s.substr(0, lastLineDelimiterPos + 1); + } + std::vector lines; + boost::split(lines, s, boost::is_any_of(lineDelimiter_)); + auto readFields = [&](RowVectorPtr& rowVector, + const std::string& line, + const size_t rowIndex) -> void { + std::vector fields; + boost::split(fields, line, boost::is_any_of(options_.fieldDemiliter())); + VELOX_CHECK(fields.size() == rowVector->childrenSize()); + for (size_t j = 0; j < fields.size(); ++j) { + deserialize( + rowVector->childAt(j), schema_->childAt(j), rowIndex, fields[j]); + } + }; + RowVectorPtr row = std::dynamic_pointer_cast(result); + const size_t rowsToRead = + maxRowsToRead > lines.size() - 1 ? lines.size() - 1 : maxRowsToRead; + row->resize(rowsToRead); + /// TODO: Combine the implemention of read one line and multiple lines. + if (rowsToRead > 0) { + for (size_t i = 0; i < rowsToRead; ++i) { + readFields(row, lines[i], i); + totalReadRows_++; + totalReadBytes_ += lines[i].size() + 1; + } + readRows_ = lines.size() - 1; + } else if (lines.size() == 1) { + readFields(row, lines[0], 0); + totalReadRows_++; + totalReadBytes_ += lines[0].size() + 1; + readRows_ = 1; + } + return readRows_; +} + +int64_t TextRowReader::nextRowNumber() { + return options_.maxReadRows(); +} + +int64_t TextRowReader::nextReadSize(uint64_t size) { + return readRows_; +} + +template +const inline T convertTo( + const std::string& s, + const T& defaultValue, + std::optional& errMsg) { + if constexpr (std::is_same_v || std::is_same_v) { + if (s == "NaN") { + return std::numeric_limits::quiet_NaN(); + } + } + auto result = folly::tryTo(s); + if (result.hasValue()) { + return result.value(); + } else { + std::stringstream ss; + ss << "Failed to convert " << s << " to type:" << typeid(T).name(); + errMsg.emplace(ss.str()); + return defaultValue; + } +} + +const void TextRowReader::deserialize( + VectorPtr& field, + const TypePtr& type, + const size_t index, + const std::string& s) { + std::optional errMsg; + switch (type->kind()) { + case TypeKind::BOOLEAN: + field->asFlatVector()->set( + index, convertTo(s, false, errMsg)); + break; + case TypeKind::TINYINT: + field->asFlatVector()->set( + index, convertTo(s, 0, errMsg)); + break; + case TypeKind::SMALLINT: + field->asFlatVector()->set( + index, convertTo(s, 0, errMsg)); + break; + case TypeKind::INTEGER: + field->asFlatVector()->set( + index, convertTo(s, 0, errMsg)); + break; + case TypeKind::BIGINT: + field->asFlatVector()->set( + index, convertTo(s, 0, errMsg)); + break; + case TypeKind::REAL: + field->asFlatVector()->set(index, convertTo(s, 0, errMsg)); + break; + case TypeKind::DOUBLE: + field->asFlatVector()->set( + index, convertTo(s, 0, errMsg)); + break; + case TypeKind::VARCHAR: { + StringView sv(s.data(), s.size()); + field->asFlatVector()->set(index, sv); + break; + } + case TypeKind::TIMESTAMP: { + const auto timestamp = + util::fromTimestampString( + s.data(), s.size(), util::TimestampParseMode::kLegacyCast) + .thenOrThrow(folly::identity, [&](const Status& status) { + VELOX_FAIL("error while parse timestamp: {}", status.message()); + }); + field->asFlatVector()->set(index, timestamp); + break; + } + default: + VELOX_UNSUPPORTED( + "The type of {} is not supported currently.", type->name()); + } + if (errMsg.has_value()) { + VELOX_FAIL(errMsg.value()); + } +} + +std::optional TextRowReader::estimatedRowSize() const { + VELOX_CHECK(schema_ != nullptr); + const std::vector>& fieldTypes = + schema_->children(); + std::optional result; + size_t rowSize = 0; + for (const auto& fieldType : fieldTypes) { + VELOX_CHECK(fieldType->isPrimitiveType()); + rowSize += fieldType->cppSizeInBytes(); + } + return result.emplace(rowSize); +} + +std::unique_ptr TextReader::createRowReader( + const dwio::common::RowReaderOptions& options) const { + const RowReaderOptions* rowReaderOptions = + static_cast(&options); + return std::make_unique( + options_.fileSchema(), buffer_->getInputStream(), *rowReaderOptions); +} + +std::unique_ptr TextReaderFactory::createReader( + std::unique_ptr buffer, + const dwio::common::ReaderOptions& options) { + return std::make_unique(std::move(buffer), options); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/reader/TextReader.h b/velox/dwio/text/reader/TextReader.h new file mode 100644 index 000000000000..3d119dfb5756 --- /dev/null +++ b/velox/dwio/text/reader/TextReader.h @@ -0,0 +1,151 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/InputStream.h" +#include "velox/dwio/common/Mutation.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Reader.h" +#include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/common/Statistics.h" +#include "velox/dwio/common/TypeWithId.h" +#include "velox/type/Type.h" +#include "velox/vector/BaseVector.h" + +namespace facebook::velox::text { + +class RowReaderOptions : public dwio::common::ReaderOptions, + public dwio::common::RowReaderOptions { + public: + RowReaderOptions( + memory::MemoryPool* pool, + const char* fieldDelimiter, + const uint64_t maxReadRows, + const uint64_t maxReadSize) + : ReaderOptions(pool), + fieldDemiliter_(fieldDelimiter), + maxReadRows_(maxReadRows), + maxReadSize_(maxReadSize) {} + + const size_t maxReadRows() const { + return maxReadRows_; + } + + const char* fieldDemiliter() const { + return fieldDemiliter_; + } + + const uint64_t maxReadSize() const { + return maxReadSize_; + } + + private: + const char* fieldDemiliter_; + const uint64_t maxReadRows_; + const uint64_t maxReadSize_; +}; + +class TextRowReader : public dwio::common::RowReader { + public: + TextRowReader( + const RowTypePtr& schema, + const std::shared_ptr& fileInput, + const RowReaderOptions& options); + + uint64_t next( + uint64_t maxRowsToRead, + velox::VectorPtr& result, + const dwio::common::Mutation* mutation = nullptr) override; + + int64_t nextRowNumber() override; + + int64_t nextReadSize(uint64_t size) override; + + void updateRuntimeStats( + dwio::common::RuntimeStatistics& stats) const override {} + + void resetFilterCaches() override {} + + std::optional estimatedRowSize() const override; + + private: + const RowTypePtr schema_; + const RowReaderOptions options_; + const std::shared_ptr fileInput_; + const size_t fileSize_; + const char* lineDelimiter_ = "\n"; + mutable int64_t readRows_ = 0; + mutable int64_t totalReadRows_ = 0; + mutable int64_t totalReadBytes_ = 0; + mutable bool readFinished_ = false; + + const void deserialize( + VectorPtr& field, + const TypePtr& type, + const size_t index, + const std::string& s); +}; + +class TextReader : public dwio::common::Reader { + public: + TextReader( + std::unique_ptr buffer, + const dwio::common::ReaderOptions& options) + : buffer_(std::move(buffer)), + options_(options), + typeId_(dwio::common::TypeWithId::create(options_.fileSchema())) { + VELOX_CHECK(options_.fileFormat() == dwio::common::FileFormat::TEXT); + } + + std::optional numberOfRows() const override { + std::optional rows; + return rows; + } + + std::unique_ptr columnStatistics( + uint32_t index) const override { + return nullptr; + } + + const RowTypePtr& rowType() const override { + return options_.fileSchema(); + } + + const std::shared_ptr& typeWithId() + const override { + return typeId_; + } + + std::unique_ptr createRowReader( + const dwio::common::RowReaderOptions& options = {}) const override; + + private: + const std::unique_ptr buffer_; + const dwio::common::ReaderOptions options_; + const std::shared_ptr typeId_; +}; + +class TextReaderFactory : public dwio::common::ReaderFactory { + public: + TextReaderFactory() : ReaderFactory(dwio::common::FileFormat::TEXT) {} + + std::unique_ptr createReader( + std::unique_ptr buffer, + const dwio::common::ReaderOptions& options) override; +}; +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/CMakeLists.txt b/velox/dwio/text/tests/CMakeLists.txt index 34a05424d366..02b922a9f9f9 100644 --- a/velox/dwio/text/tests/CMakeLists.txt +++ b/velox/dwio/text/tests/CMakeLists.txt @@ -24,3 +24,4 @@ set(TEST_LINK_LIBS glog::glog) add_subdirectory(writer) +add_subdirectory(reader) diff --git a/velox/dwio/text/tests/reader/CMakeLists.txt b/velox/dwio/text/tests/reader/CMakeLists.txt new file mode 100644 index 000000000000..65f217259a04 --- /dev/null +++ b/velox/dwio/text/tests/reader/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_text_reader_test TextReaderTest.cpp) + +add_test( + NAME velox_text_reader_test + COMMAND velox_text_reader_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries( + velox_text_reader_test + velox_dwio_text_reader + velox_vector_test_lib + velox_exec_test_lib + GTest::gtest) \ No newline at end of file diff --git a/velox/dwio/text/tests/reader/TextReaderTest.cpp b/velox/dwio/text/tests/reader/TextReaderTest.cpp new file mode 100644 index 000000000000..365a9e9ceb1a --- /dev/null +++ b/velox/dwio/text/tests/reader/TextReaderTest.cpp @@ -0,0 +1,165 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/dwio/text/reader/TextReader.h" +#include +#include +#include +#include "velox/common/base/Fs.h" +#include "velox/common/config/Config.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/text/RegisterTextReader.h" +#include "velox/dwio/text/writer/TextWriter.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::test { +class TextReaderTest : public testing::Test, + public velox::test::VectorTestBase { + public: + void SetUp() override { + velox::filesystems::registerLocalFileSystem(); + dwio::common::LocalFileSink::registerFactory(); + text::registerTextReaderFactory(); + rootPool_ = memory::memoryManager()->addRootPool("TextReaderTests"); + leafPool_ = rootPool_->addLeafChild("TextReaderTests"); + schema_ = + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + VARCHAR()}); + std::shared_ptr tempPath = + exec::test::TempDirectoryPath::create(); + filePath_ = + fs::path(fmt::format("{}/test_text_reader.txt", tempPath->getPath())); + initFileData(); + initFileReader(); + } + + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + void initFileData() { + auto data = makeRowVector( + {"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {makeConstant(true, 3), + makeFlatVector({1, 2, 3}), + makeFlatVector({1, 2, 3}), // TODO null + makeFlatVector({1, 2, 3}), + makeFlatVector({1, 2, 3}), + makeFlatVector({1.1, kInf, 3.1}), + makeFlatVector({1.1, kNaN, 3.1}), + makeFlatVector( + 3, [](auto i) { return Timestamp(i, i * 1'000'000); }), + makeFlatVector({"hello", "world", "cpp"}, VARCHAR())}); + + text::WriterOptions writerOptions; + writerOptions.memoryPool = rootPool_.get(); + auto sink = std::make_unique( + filePath_, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + schema_, + std::move(sink), + std::make_shared(writerOptions)); + for (int i = 0; i < 1024; ++i) { + writer->write(data); + writer->flush(); + } + writer->close(); + } + + void initFileReader() { + std::unordered_map configMap; + auto fs = filesystems::getFileSystem( + filePath_, std::make_shared(std::move(configMap))); + std::shared_ptr readFile = fs->openFileForRead(filePath_); + VELOX_CHECK(readFile != nullptr); + std::unique_ptr input = + std::make_unique(readFile, *pool_); + text::RowReaderOptions rowReaderOptions(pool_.get(), "\x01", 1, 1024); + rowReaderOptions.setFileFormat(dwio::common::FileFormat::TEXT); + rowReaderOptions.setFileSchema(schema_); + auto readerFactory = + dwio::common::getReaderFactory(dwio::common::FileFormat::TEXT); + auto textReader = + readerFactory->createReader(std::move(input), rowReaderOptions); + VELOX_CHECK(textReader != nullptr); + reader_ = textReader->createRowReader(rowReaderOptions); + } + + constexpr static float kInf = std::numeric_limits::infinity(); + constexpr static double kNaN = std::numeric_limits::quiet_NaN(); + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::string filePath_; + RowTypePtr schema_; + std::unique_ptr reader_; +}; + +TEST_F(TextReaderTest, TestReadData) { + VectorPtr t = RowVector::createEmpty(schema_, leafPool_.get()); + VectorPtr res = RowVector::createEmpty(schema_, leafPool_.get()); + while (reader_->next(1024, t, nullptr) != 0) { + res->append(t.get()); + t->prepareForReuse(); + } + RowVectorPtr r = std::dynamic_pointer_cast(res); + ASSERT_TRUE(r->size() == 3072); + std::vector& vecs = r->children(); + ASSERT_TRUE(vecs.size() == 9); + auto v0 = vecs[0]->asFlatVector(); + auto v1 = vecs[1]->asFlatVector(); + auto v2 = vecs[2]->asFlatVector(); + auto v3 = vecs[3]->asFlatVector(); + auto v4 = vecs[4]->asFlatVector(); + auto v5 = vecs[5]->asFlatVector(); + auto v6 = vecs[6]->asFlatVector(); + auto v7 = vecs[7]->asFlatVector(); + auto v8 = vecs[8]->asFlatVector(); + ASSERT_TRUE( + v0->valueAt(0) == true && v0->valueAt(1) == true && + v0->valueAt(2) == true); + ASSERT_TRUE( + v1->valueAt(0) == 1 && v1->valueAt(1) == 2 && v1->valueAt(2) == 3); + ASSERT_TRUE( + v2->valueAt(0) == 1 && v2->valueAt(1) == 2 && v2->valueAt(2) == 3); + ASSERT_TRUE( + v3->valueAt(0) == 1 && v3->valueAt(1) == 2 && v3->valueAt(2) == 3); + ASSERT_TRUE( + v4->valueAt(0) == 1 && v4->valueAt(1) == 2 && v4->valueAt(2) == 3); + ASSERT_TRUE( + v5->valueAt(30) == 1.1f && v5->valueAt(31) == kInf && + v5->valueAt(32) == 3.1f); + ASSERT_TRUE( + v6->valueAt(0) == 1.1 && std::to_string(v6->valueAt(1)) == "nan" && + v6->valueAt(2) == 3.1); + for (size_t i = 0; i < 3; ++i) { + auto t = v7->valueAt(i); + ASSERT_TRUE(t.getSeconds() == i && t.getNanos() == i * 1'000'000); + } + ASSERT_TRUE( + v8->valueAt(3069).str() == "hello" && + v8->valueAt(3070).str() == "world" && v8->valueAt(3071).str() == "cpp"); +} + +} // namespace facebook::velox::test diff --git a/velox/experimental/stateful/CMakeLists.txt b/velox/experimental/stateful/CMakeLists.txt index 130eac4d85fa..1a678812eadb 100644 --- a/velox/experimental/stateful/CMakeLists.txt +++ b/velox/experimental/stateful/CMakeLists.txt @@ -28,6 +28,7 @@ velox_add_library( StatefulOperator.cpp StreamPartition.cpp StreamJoin.cpp + StreamLookupJoin.cpp StreamKeyedOperator.cpp WindowJoin.cpp KeySelector.cpp diff --git a/velox/experimental/stateful/StatefulOperator.cpp b/velox/experimental/stateful/StatefulOperator.cpp index 4c1990cb43a2..06a332012d4b 100644 --- a/velox/experimental/stateful/StatefulOperator.cpp +++ b/velox/experimental/stateful/StatefulOperator.cpp @@ -17,8 +17,6 @@ #include "velox/experimental/stateful/StatefulTask.h" #include "velox/experimental/stateful/StreamElement.h" -#include - namespace facebook::velox::stateful { void StatefulOperator::initialize() { diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 8409a1b3b505..f8b48c2687d4 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -53,6 +53,7 @@ #include "velox/experimental/stateful/StreamJoin.h" #include "velox/experimental/stateful/WatermarkAssigner.h" #include "velox/experimental/stateful/WindowAggregator.h" +#include "velox/experimental/stateful/StreamLookupJoin.h" #include "velox/experimental/stateful/WindowJoin.h" #include "velox/experimental/stateful/GroupWindowAggregator.h" #include "velox/experimental/stateful/window/GroupWindowAggsHandler.h" @@ -103,6 +104,9 @@ StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( std::move(op), partitionNode->partition()->partitionFunctionSpec(), numPartitions); + } else if (auto indexLookupJoinNode = + std::dynamic_pointer_cast(statefulNode->node())) { + return std::make_unique(std::move(op), std::move(targets)); } else if ( auto joinNode = std::dynamic_pointer_cast(statefulNode->node())) { diff --git a/velox/experimental/stateful/StreamLookupJoin.cpp b/velox/experimental/stateful/StreamLookupJoin.cpp new file mode 100644 index 000000000000..afa388857077 --- /dev/null +++ b/velox/experimental/stateful/StreamLookupJoin.cpp @@ -0,0 +1,42 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/experimental/stateful/StreamLookupJoin.h" +#include + +namespace facebook::velox::stateful { + +void StreamLookupJoin::addInput(RowVectorPtr input) { + op()->addInput(input); +} + +void StreamLookupJoin::getOutput() { + RowVectorPtr output = nullptr; + while (true) { + RowVectorPtr t = op()->getOutput(); + if (t == nullptr) { + break; + } + if (output == nullptr) { + output = t; + } else { + output->append(t.get()); + } + } + if (output != nullptr) { + pushOutput(output); + } +} +} // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/StreamLookupJoin.h b/velox/experimental/stateful/StreamLookupJoin.h new file mode 100644 index 000000000000..bce5c9a6c52d --- /dev/null +++ b/velox/experimental/stateful/StreamLookupJoin.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/experimental/stateful/StatefulOperator.h" + +namespace facebook::velox::stateful { + +class StreamLookupJoin : public StatefulOperator { + public: + StreamLookupJoin( + std::unique_ptr op, + std::vector> targets) + : StatefulOperator(std::move(op), std::move(targets)) {} + + void addInput(RowVectorPtr input) override; + + void getOutput() override; +}; + +} // namespace facebook::velox::stateful From ea6a334f97a7919871de3fc3d0d55208a4c50433 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 22 Dec 2025 03:00:30 +0000 Subject: [PATCH 2/2] fix compiling problems --- velox/connectors/filesystem/FileSystemConfig.cpp | 4 ++-- velox/connectors/filesystem/FileSystemConfig.h | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/velox/connectors/filesystem/FileSystemConfig.cpp b/velox/connectors/filesystem/FileSystemConfig.cpp index 36f334c4dec6..7d68e6835589 100644 --- a/velox/connectors/filesystem/FileSystemConfig.cpp +++ b/velox/connectors/filesystem/FileSystemConfig.cpp @@ -39,7 +39,7 @@ const T FileSystemConfig::checkAndGetConfigValue( } } -const dwio::common::FileFormat FileSystemWriteConfig::getFormat() { +const dwio::common::FileFormat FileSystemConfig::getFormat() { const std::string format = checkAndGetConfigValue(kFormat, ""); if (supportedFileFormats.find(format) != supportedFileFormats.end()) { return supportedFileFormats.at(format); @@ -48,7 +48,7 @@ const dwio::common::FileFormat FileSystemWriteConfig::getFormat() { } } -const std::string FileSystemWriteConfig::getPath() { +const std::string FileSystemConfig::getPath() { return checkAndGetConfigValue(kPath, ""); } diff --git a/velox/connectors/filesystem/FileSystemConfig.h b/velox/connectors/filesystem/FileSystemConfig.h index 97b4bedddcbc..f53bb78cde5b 100644 --- a/velox/connectors/filesystem/FileSystemConfig.h +++ b/velox/connectors/filesystem/FileSystemConfig.h @@ -30,6 +30,12 @@ class FileSystemConfig { static constexpr const char* kPath = "path"; static constexpr const char* kFormat = "format"; + /// The supported file format to write + const std::unordered_map supportedFileFormats = { + {"csv", dwio::common::FileFormat::TEXT}, + {"parquet", dwio::common::FileFormat::PARQUET}, + {"orc", dwio::common::FileFormat::ORC} + }; const std::string getPath(); const dwio::common::FileFormat getFormat(); @@ -79,15 +85,7 @@ class FileSystemWriteConfig : public FileSystemConfig { "partition.time-extractor.timestamp-pattern"; /// The default value of max partitions per writer. static constexpr const int32_t defaultMaxPartitionsPerWriter = 65535; - /// The supported file format to write - const std::unordered_map supportedFileFormats = { - {"csv", dwio::common::FileFormat::TEXT}, - {"parquet", dwio::common::FileFormat::PARQUET}, - {"orc", dwio::common::FileFormat::ORC} - }; - const std::string getPath(); - const dwio::common::FileFormat getFormat(); const bool allowNullPartitionKeys() { return false; }