Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ BUILD_USER ?=
BUILD_CHANNEL ?=
# Use commas to separate multiple file systems, such as `hdfs,tos`
ENABLE_HDFS ?= True
ENABLE_S3 ?= False
ENABLE_S3 ?= True
USE_ARROW_HDFS ?= True
ENABLE_ASAN ?= False
LDB_BUILD ?= False
Expand Down
15 changes: 10 additions & 5 deletions bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#include "bolt/common/file/File.h"
#include "bolt/common/file/FileSystems.h"
#include "bolt/connectors/hive/HiveConfig.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
Expand Down Expand Up @@ -79,7 +80,11 @@ class AbfsReadFile::Impl {
connectStr_, fileSystem_, fileName_));
}

void initialize() {
void initialize(const FileOptions& options) {
// If caller provided file size, trust it and avoid metadata call.
if (options.fileSize > 0) {
length_ = options.fileSize;
}
if (length_ != -1) {
return;
}
Expand Down Expand Up @@ -188,8 +193,8 @@ AbfsReadFile::AbfsReadFile(
impl_ = std::make_shared<Impl>(path, connectStr);
}

void AbfsReadFile::initialize() {
return impl_->initialize();
void AbfsReadFile::initialize(const FileOptions& options) {
return impl_->initialize(options);
}

std::string_view
Expand Down Expand Up @@ -265,10 +270,10 @@ std::string AbfsFileSystem::name() const {

std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
auto abfsfile = std::make_unique<AbfsReadFile>(
std::string(path), impl_->connectionString(std::string(path)));
abfsfile->initialize();
abfsfile->initialize(options);
return abfsfile;
}
} // namespace bytedance::bolt::filesystems::abfs
3 changes: 2 additions & 1 deletion bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@
#include <folly/executors/ThreadedExecutor.h>
#include <folly/futures/Future.h>
#include "bolt/common/file/File.h"
#include "bolt/common/file/FileSystems.h"
#include "bolt/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
namespace bytedance::bolt::filesystems::abfs {
class AbfsReadFile final : public ReadFile {
public:
explicit AbfsReadFile(const std::string& path, const std::string& connectStr);

void initialize();
void initialize(const FileOptions& options);

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include <atomic>
#include <random>
using namespace bytedance::bolt;
using namespace bytedance::bolt::filesystems;
using namespace bytedance::bolt::filesystems::abfs;

using ::bytedance::bolt::common::Region;

Expand Down Expand Up @@ -147,17 +149,40 @@ TEST_F(AbfsFileSystemTest, readFile) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
auto readFile = abfs->openFileForRead(fullFilePath);
AbfsFileSystem abfs{hiveConfig};
auto readFile = abfs.openFileForRead(fullFilePath);
readData(readFile.get());
}

TEST_F(AbfsFileSystemTest, openFileForReadWithOptions) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
AbfsFileSystem abfs{hiveConfig};
FileOptions options;
options.fileSize = 15 + kOneMB;
auto readFile = abfs.openFileForRead(fullFilePath, options);
readData(readFile.get());
}

TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
AbfsFileSystem abfs{hiveConfig};
FileOptions options;
options.fileSize = -kOneMB;
BOLT_ASSERT_THROW(
abfs.openFileForRead(fullFilePath, options),
"File size must be non-negative");
}

TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) {
std::atomic<bool> startThreads = false;
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand All @@ -172,7 +197,7 @@ TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) {
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
auto readFile = abfs->openFileForRead(fullFilePath);
auto readFile = abfs.openFileForRead(fullFilePath);
readData(readFile.get());
});
threads.emplace_back(std::move(thread));
Expand All @@ -190,8 +215,8 @@ TEST_F(AbfsFileSystemTest, missingFile) {
azuriteServer->connectionStr()}});
const std::string abfsFile =
bytedance::bolt::filesystems::test::AzuriteABFSEndpoint + "test.txt";
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
auto readFile = abfs->openFileForRead(abfsFile);
AbfsFileSystem abfs{hiveConfig};
auto readFile = abfs.openFileForRead(abfsFile);
FAIL() << "Expected BoltException";
} catch (BoltException const& err) {
EXPECT_TRUE(err.message().find("404") != std::string::npos);
Expand All @@ -202,67 +227,65 @@ TEST_F(AbfsFileSystemTest, openFileForWriteNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(
abfs->openFileForWrite(fullFilePath), "write for abfs not implemented");
abfs.openFileForWrite(fullFilePath), "write for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, renameNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(
abfs->rename("text", "text2"), "rename for abfs not implemented");
abfs.rename("text", "text2"), "rename for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, removeNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
BOLT_ASSERT_THROW(abfs->remove("text"), "remove for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(abfs.remove("text"), "remove for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, existsNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
BOLT_ASSERT_THROW(abfs->exists("text"), "exists for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(abfs.exists("text"), "exists for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, listNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
BOLT_ASSERT_THROW(abfs->list("dir"), "list for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(abfs.list("dir"), "list for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, mkdirNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
BOLT_ASSERT_THROW(abfs->mkdir("dir"), "mkdir for abfs not implemented");
BOLT_ASSERT_THROW(abfs.mkdir("dir"), "mkdir for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, rmdirNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
BOLT_ASSERT_THROW(abfs->rmdir("dir"), "rmdir for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(abfs.rmdir("dir"), "rmdir for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, credNotFOund) {
const std::string abfsFile =
std::string("abfs://test@test1.dfs.core.windows.net/test");
auto hiveConfig = AbfsFileSystemTest::hiveConfig({});
auto abfs =
std::make_shared<bytedance::bolt::filesystems::abfs::AbfsFileSystem>(
hiveConfig);
AbfsFileSystem abfs{hiveConfig};
BOLT_ASSERT_THROW(
abfs->openFileForRead(abfsFile), "Failed to find storage credentials");
abfs.openFileForRead(abfsFile), "Failed to find storage credentials");
}
11 changes: 8 additions & 3 deletions bolt/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "bolt/common/base/Exceptions.h"
#include "bolt/common/config/Config.h"
#include "bolt/common/file/File.h"
#include "bolt/common/file/FileSystems.h"
#include "bolt/connectors/hive/HiveConfig.h"
#include "bolt/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "bolt/core/QueryConfig.h"
Expand Down Expand Up @@ -83,7 +84,11 @@ class GCSReadFile final : public ReadFile {

// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize() {
void initialize(const filesystems::FileOptions& options) {
// If caller provided a trusted file size, use it to avoid metadata call.
if (options.fileSize > 0) {
length_ = options.fileSize;
}
// Make it a no-op if invoked twice.
if (length_ != -1) {
return;
Expand Down Expand Up @@ -330,10 +335,10 @@ void GCSFileSystem::initializeClient() {

std::unique_ptr<ReadFile> GCSFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GCSReadFile>(gcspath, impl_->getClient());
gcsfile->initialize();
gcsfile->initialize(options);
return gcsfile;
}

Expand Down
5 changes: 3 additions & 2 deletions bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "bolt/common/base/StatsReporter.h"
#include "bolt/common/config/Config.h"
#include "bolt/common/file/File.h"
#include "bolt/common/file/FileSystems.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3Config.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h"
#include "bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h"
Expand Down Expand Up @@ -483,10 +484,10 @@ std::string S3FileSystem::getLogPrefix() const {

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
std::string_view s3Path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
const auto path = getPath(s3Path);
auto s3file = std::make_unique<S3ReadFile>(path, impl_->s3Client());
s3file->initialize();
s3file->initialize(options);
return s3file;
}

Expand Down
13 changes: 10 additions & 3 deletions bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ class S3ReadFile::Impl {

// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize() {
void initialize(const filesystems::FileOptions& options) {
// Use provided file size if available to bypass HEAD request.
if (options.fileSize > 0) {
length_ = options.fileSize;
}
if (length_ != -1) {
return;
}
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(awsString(bucket_));
request.SetKey(awsString(key_));
Expand Down Expand Up @@ -172,8 +179,8 @@ S3ReadFile::S3ReadFile(std::string_view path, Aws::S3::S3Client* client) {

S3ReadFile::~S3ReadFile() = default;

void S3ReadFile::initialize() {
return impl_->initialize();
void S3ReadFile::initialize(const filesystems::FileOptions& options) {
return impl_->initialize(options);
}

std::string_view S3ReadFile::pread(uint64_t offset, uint64_t length, void* buf)
Expand Down
3 changes: 2 additions & 1 deletion bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#pragma once

#include "bolt/common/file/File.h"
#include "bolt/common/file/FileSystems.h"

namespace Aws::S3 {
class S3Client;
Expand Down Expand Up @@ -66,7 +67,7 @@ class S3ReadFile : public ReadFile {
return 72 << 20;
}

void initialize();
void initialize(const filesystems::FileOptions& options);

private:
void preadInternal(uint64_t offset, uint64_t length, char* position) const;
Expand Down