Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion velox/connectors/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ velox_add_library(
FileSystemConfig.cpp
FileSystemConnector.cpp
FileSystemInsertTableHandle.cpp
FileSystemDataSink.cpp)
FileSystemDataSink.cpp
FileSystemIndexSource.cpp
FileSystemColumnHandle.cpp)

velox_link_libraries(velox_filesystem_connector velox_common_io velox_connector)

Expand Down
62 changes: 62 additions & 0 deletions velox/connectors/filesystem/FileSystemColumnHandle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/filesystem/FileSystemColumnHandle.h"
#include <boost/algorithm/string.hpp>

namespace facebook::velox::connector::filesystem {

std::string FileSystemColumnHandle::toString() const {
std::string s = hive::HiveColumnHandle::toString();
boost::algorithm::replace_all(
s, "HiveColumnHandle", "FileSystemColumnHandle");
return s;
}

folly::dynamic FileSystemColumnHandle::serialize() const {
folly::dynamic obj = ColumnHandle::serializeBase("FileSystemColumnHandle");
obj["fileSystemColumnHandleName"] = hive::HiveColumnHandle::name();
obj["columnType"] = columnTypeName(hive::HiveColumnHandle::columnType());
obj["dataType"] = hive::HiveColumnHandle::dataType()->serialize();
folly::dynamic requiredSubfields = folly::dynamic::array;
const std::vector<common::Subfield>& subFields =
hive::HiveColumnHandle::requiredSubfields();
for (const auto& subfield : subFields) {
requiredSubfields.push_back(subfield.toString());
}
obj["requiredSubfields"] = requiredSubfields;
return obj;
}

ColumnHandlePtr FileSystemColumnHandle::create(const folly::dynamic& obj) {
auto name = obj["fileSystemColumnHandleName"].asString();
auto columnType = columnTypeFromName(obj["columnType"].asString());
auto dataType = ISerializable::deserialize<Type>(obj["dataType"]);
const auto& arr = obj["requiredSubfields"];
std::vector<common::Subfield> requiredSubfields;
requiredSubfields.reserve(arr.size());
for (auto& s : arr) {
requiredSubfields.emplace_back(s.asString());
}
return std::make_shared<FileSystemColumnHandle>(
name, columnType, dataType, std::move(requiredSubfields));
}

void FileSystemColumnHandle::registerSerDe() {
auto& registry = DeserializationRegistryForSharedPtr();
registry.Register("FileSystemColumnHandle", FileSystemColumnHandle::create);
}
} // namespace facebook::velox::connector::filesystem
40 changes: 40 additions & 0 deletions velox/connectors/filesystem/FileSystemColumnHandle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <folly/dynamic.h>
#include "velox/connectors/hive/TableHandle.h"
#include "velox/type/Subfield.h"

namespace facebook::velox::connector::filesystem {
class FileSystemColumnHandle : public hive::HiveColumnHandle {
public:
FileSystemColumnHandle(
const std::string& name,
ColumnType columnType,
TypePtr dataType,
std::vector<common::Subfield> requiredSubfields = {})
: hive::HiveColumnHandle(name, columnType, dataType, dataType, {}, {}) {}

std::string toString() const;

folly::dynamic serialize() const override;

static ColumnHandlePtr create(const folly::dynamic& obj);

static void registerSerDe();
};
} // namespace facebook::velox::connector::filesystem
25 changes: 20 additions & 5 deletions velox/connectors/filesystem/FileSystemConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/

#include "velox/connectors/filesystem/FileSystemConfig.h"
#include <dwio/common/Options.h>
#include "velox/dwio/common/Options.h"

namespace facebook::velox::connector::filesystem {

template <typename T, bool throwException>
const T FileSystemWriteConfig::checkAndGetConfigValue(
const T FileSystemConfig::checkAndGetConfigValue(
const std::string& configKey,
const T& defaultValue) const {
std::optional<T> configValue =
Expand All @@ -39,7 +39,7 @@ const T FileSystemWriteConfig::checkAndGetConfigValue(
}
}

const dwio::common::FileFormat FileSystemWriteConfig::getFormat() {
const dwio::common::FileFormat FileSystemConfig::getFormat() {
const std::string format = checkAndGetConfigValue<std::string, false>(kFormat, "");
if (supportedFileFormats.find(format) != supportedFileFormats.end()) {
return supportedFileFormats.at(format);
Expand All @@ -48,7 +48,7 @@ const dwio::common::FileFormat FileSystemWriteConfig::getFormat() {
}
}

const std::string FileSystemWriteConfig::getPath() {
const std::string FileSystemConfig::getPath() {
return checkAndGetConfigValue<std::string, false>(kPath, "");
}

Expand Down Expand Up @@ -102,4 +102,19 @@ const int32_t FileSystemWriteConfig::getFileRollingSize() {
}
}

} // namespace facebook::velox::connector::filesystem
const std::string FileSystemReadConfig::getFieldDelimiter() {
return checkAndGetConfigValue<std::string, false>(
kTextFormatFieldDelimiter, defaultTextFormatFieldDelimiter);
}

const uint64_t FileSystemReadConfig::getMaxReadRows() {
return checkAndGetConfigValue<uint64_t, false>(
kMaxReadRows, defaultMaxReadRows);
}

const uint64_t FileSystemReadConfig::getMaxReadBytes() {
return checkAndGetConfigValue<uint64_t, false>(
kMaxReadBytes, defaultMaxReadBytes);
}

} // namespace facebook::velox::connector::filesystem
89 changes: 62 additions & 27 deletions velox/connectors/filesystem/FileSystemConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,53 @@ namespace facebook::velox::connector::filesystem {

using ConfigPtr = std::shared_ptr<const config::ConfigBase>;

class FileSystemWriteConfig {
class FileSystemConfig {
public:
FileSystemWriteConfig(const ConfigPtr& config) : config_(config) {}
FileSystemConfig(const ConfigPtr& config) : config_(config) {}

static constexpr const char* kPath = "path";
/// The config key fo format
static constexpr const char* kFormat = "format";
/// The supported file format to write
const std::unordered_map<std::string, dwio::common::FileFormat> supportedFileFormats = {
{"csv", dwio::common::FileFormat::TEXT},
{"parquet", dwio::common::FileFormat::PARQUET},
{"orc", dwio::common::FileFormat::ORC}
};

const std::string getPath();
const dwio::common::FileFormat getFormat();

const bool exists(const std::string& configKey) {
return config_ && config_->valueExists(configKey);
}

const ConfigPtr& config() {
return config_;
}

template <typename T>
const std::shared_ptr<T> updateAndGetAllConfigs(
const std::unordered_map<std::string, std::string>& configs) const {
std::unordered_map<std::string, std::string> rawConfigs =
config_->rawConfigsCopy();
rawConfigs.insert(configs.begin(), configs.end());
ConfigPtr newConfig =
std::make_shared<const config::ConfigBase>(std::move(rawConfigs));
return std::make_shared<T>(newConfig);
}

protected:
ConfigPtr config_;

template <typename T, bool throwException>
const T checkAndGetConfigValue(const std::string& configKey, const T& defaultValue)
const;
};

class FileSystemWriteConfig : public FileSystemConfig {
public:
FileSystemWriteConfig(const ConfigPtr& config) : FileSystemConfig(config) {}

static constexpr const char* kFileRollingInterval =
"sink.rolling-policy.rollover-interval";
static constexpr const char* kFileRollingSize =
Expand All @@ -45,15 +85,7 @@ class FileSystemWriteConfig {
"partition.time-extractor.timestamp-pattern";
/// The default value of max partitions per writer.
static constexpr const int32_t defaultMaxPartitionsPerWriter = 65535;
/// The supported file format to write
const std::unordered_map<std::string, dwio::common::FileFormat> supportedFileFormats = {
{"csv", dwio::common::FileFormat::TEXT},
{"parquet", dwio::common::FileFormat::PARQUET},
{"orc", dwio::common::FileFormat::ORC}
};

const std::string getPath();
const dwio::common::FileFormat getFormat();
const bool allowNullPartitionKeys() {
return false;
}
Expand All @@ -74,6 +106,24 @@ class FileSystemWriteConfig {
const bool flushOnWrite() {
return true;
}
};

class FileSystemReadConfig : public FileSystemConfig {
public:
FileSystemReadConfig(const ConfigPtr& config) : FileSystemConfig(config) {}

static constexpr const char* kTextFormatFieldDelimiter =
"csv.field.delimiter";
static constexpr const char* kMaxReadRows = "max.read.rows";
static constexpr const char* kMaxReadBytes = "max.read.bytes";

static constexpr const char* defaultTextFormatFieldDelimiter = ",";
static constexpr const uint64_t defaultMaxReadRows = 10000;
static constexpr const uint64_t defaultMaxReadBytes = 1024;

const std::string getFieldDelimiter();
const uint64_t getMaxReadRows();
const uint64_t getMaxReadBytes();
const bool exists(const std::string& configKey) {
return config_ && config_->valueExists(configKey);
}
Expand All @@ -82,22 +132,7 @@ class FileSystemWriteConfig {
return config_;
}

template <typename T>
const std::shared_ptr<T> updateAndGetAllConfigs(
const std::unordered_map<std::string, std::string>& configs) const {
std::unordered_map<std::string, std::string> rawConfigs =
config_->rawConfigsCopy();
rawConfigs.insert(configs.begin(), configs.end());
ConfigPtr newConfig =
std::make_shared<const config::ConfigBase>(std::move(rawConfigs));
return std::make_shared<T>(newConfig);
}

private:
ConfigPtr config_;

template <typename T, bool throwException>
const T checkAndGetConfigValue(const std::string& configKey, const T& defaultValue)
const;
};
} // namespace facebook::velox::connector::filesystem
} // namespace facebook::velox::connector::filesystem
78 changes: 76 additions & 2 deletions velox/connectors/filesystem/FileSystemConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/config/Config.h"
#include "velox/connectors/filesystem/FileSystemConnector.h"
#include "velox/connectors/filesystem/FileSystemIndexSource.h"
#include "velox/connectors/filesystem/FileSystemIndexTableHandle.h"
#include "velox/connectors/filesystem/FileSystemInsertTableHandle.h"
#include "velox/connectors/filesystem/FileSystemDataSink.h"
#include <folly/executors/CPUThreadPoolExecutor.h>

namespace facebook::velox::connector::filesystem {

Expand Down Expand Up @@ -51,4 +54,75 @@ std::unique_ptr<DataSink> FileSystemConnector::createDataSink(
insertTableHandle->parititonIndexes(),
insertTableHandle->partitionKeys());
}
} // namespace facebook::velox::connector::filesystem

core::TypedExprPtr toLookupJoinConditionExpr(
const std::vector<std::shared_ptr<core::IndexLookupCondition>>&
joinConditions,
const std::shared_ptr<FileSystemIndexTableHandle>& tableHandle,
const RowTypePtr& inputType) {
if (joinConditions.empty()) {
return nullptr;
}
const auto& keyType = tableHandle->keyType();
std::vector<core::TypedExprPtr> conditionExprs;
conditionExprs.reserve(joinConditions.size());
for (const auto& condition : joinConditions) {
auto indexColumnExpr = std::make_shared<core::FieldAccessTypedExpr>(
keyType->findChild(condition->key->name()), condition->key->name());
if (auto inCondition =
std::dynamic_pointer_cast<core::InIndexLookupCondition>(
condition)) {
conditionExprs.push_back(std::make_shared<const core::CallTypedExpr>(
BOOLEAN(),
std::vector<core::TypedExprPtr>{
inCondition->list, std::move(indexColumnExpr)},
"contains"));
continue;
}
if (auto betweenCondition =
std::dynamic_pointer_cast<core::BetweenIndexLookupCondition>(
condition)) {
conditionExprs.push_back(std::make_shared<const core::CallTypedExpr>(
BOOLEAN(),
std::vector<core::TypedExprPtr>{
std::move(indexColumnExpr),
betweenCondition->lower,
betweenCondition->upper},
"between"));
continue;
}
VELOX_FAIL("Invalid index join condition: {}", condition->toString());
}
return std::make_shared<core::CallTypedExpr>(
BOOLEAN(), conditionExprs, "and");
}

std::shared_ptr<IndexSource> FileSystemConnector::createIndexSource(
const RowTypePtr& inputType,
size_t numJoinKeys,
const std::vector<std::shared_ptr<core::IndexLookupCondition>>&
joinConditions,
const RowTypePtr& outputType,
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
const std::shared_ptr<FileSystemIndexTableHandle> fsTableHandle =
std::dynamic_pointer_cast<FileSystemIndexTableHandle>(tableHandle);

std::shared_ptr<folly::Executor> executor;
if (fsTableHandle->asyncLookup()) {
executor = std::make_shared<folly::CPUThreadPoolExecutor>(1);
}
return std::make_shared<FileSystemIndexSource>(
inputType,
outputType,
numJoinKeys,
toLookupJoinConditionExpr(joinConditions, fsTableHandle, inputType),
fsTableHandle,
connectorQueryCtx,
executor);
}

} // namespace facebook::velox::connector::filesystem
Loading