From b6fdc19f21bade07aa564c837aa88f09a6083100 Mon Sep 17 00:00:00 2001 From: Gu Haiyan Date: Wed, 25 Feb 2026 21:22:59 +0800 Subject: [PATCH] cp file size --- Makefile | 2 +- .../storage_adapters/abfs/AbfsFileSystem.cpp | 15 ++-- .../hive/storage_adapters/abfs/AbfsReadFile.h | 3 +- .../abfs/tests/AbfsFileSystemTest.cpp | 69 ++++++++++++------- .../storage_adapters/gcs/GCSFileSystem.cpp | 11 ++- .../storage_adapters/s3fs/S3FileSystem.cpp | 5 +- .../hive/storage_adapters/s3fs/S3ReadFile.cpp | 13 +++- .../hive/storage_adapters/s3fs/S3ReadFile.h | 3 +- 8 files changed, 82 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index c62c23170..7088de6d4 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ BUILD_USER ?= BUILD_CHANNEL ?= # Use commas to separate multiple file systems, such as `hdfs,tos` ENABLE_HDFS ?= True -ENABLE_S3 ?= False +ENABLE_S3 ?= True USE_ARROW_HDFS ?= True ENABLE_ASAN ?= False LDB_BUILD ?= False diff --git a/bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 6329f5454..d64768599 100644 --- a/bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -30,6 +30,7 @@ #include "bolt/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" #include "bolt/common/file/File.h" +#include "bolt/common/file/FileSystems.h" #include "bolt/connectors/hive/HiveConfig.h" #include "bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" #include "bolt/connectors/hive/storage_adapters/abfs/AbfsUtil.h" @@ -79,7 +80,11 @@ class AbfsReadFile::Impl { connectStr_, fileSystem_, fileName_)); } - void initialize() { + void initialize(const FileOptions& options) { + // If caller provided file size, trust it and avoid metadata call. + if (options.fileSize > 0) { + length_ = options.fileSize; + } if (length_ != -1) { return; } @@ -188,8 +193,8 @@ AbfsReadFile::AbfsReadFile( impl_ = std::make_shared(path, connectStr); } -void AbfsReadFile::initialize() { - return impl_->initialize(); +void AbfsReadFile::initialize(const FileOptions& options) { + return impl_->initialize(options); } std::string_view @@ -265,10 +270,10 @@ std::string AbfsFileSystem::name() const { std::unique_ptr AbfsFileSystem::openFileForRead( std::string_view path, - const FileOptions& /*unused*/) { + const FileOptions& options) { auto abfsfile = std::make_unique( std::string(path), impl_->connectionString(std::string(path))); - abfsfile->initialize(); + abfsfile->initialize(options); return abfsfile; } } // namespace bytedance::bolt::filesystems::abfs diff --git a/bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h b/bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h index 7d5e0fde2..460c48751 100644 --- a/bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h +++ b/bolt/connectors/hive/storage_adapters/abfs/AbfsReadFile.h @@ -31,13 +31,14 @@ #include #include #include "bolt/common/file/File.h" +#include "bolt/common/file/FileSystems.h" #include "bolt/connectors/hive/storage_adapters/abfs/AbfsUtil.h" namespace bytedance::bolt::filesystems::abfs { class AbfsReadFile final : public ReadFile { public: explicit AbfsReadFile(const std::string& path, const std::string& connectStr); - void initialize(); + void initialize(const FileOptions& options); std::string_view pread(uint64_t offset, uint64_t length, void* buf) const final; diff --git a/bolt/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/bolt/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 310a8651a..3080e1230 100644 --- a/bolt/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/bolt/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -43,6 +43,8 @@ #include #include using namespace bytedance::bolt; +using namespace bytedance::bolt::filesystems; +using namespace bytedance::bolt::filesystems::abfs; using ::bytedance::bolt::common::Region; @@ -147,17 +149,40 @@ TEST_F(AbfsFileSystemTest, readFile) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); - auto readFile = abfs->openFileForRead(fullFilePath); + AbfsFileSystem abfs{hiveConfig}; + auto readFile = abfs.openFileForRead(fullFilePath); + readData(readFile.get()); +} + +TEST_F(AbfsFileSystemTest, openFileForReadWithOptions) { + auto hiveConfig = AbfsFileSystemTest::hiveConfig( + {{"fs.azure.account.key.test.dfs.core.windows.net", + azuriteServer->connectionStr()}}); + AbfsFileSystem abfs{hiveConfig}; + FileOptions options; + options.fileSize = 15 + kOneMB; + auto readFile = abfs.openFileForRead(fullFilePath, options); readData(readFile.get()); } +TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) { + auto hiveConfig = AbfsFileSystemTest::hiveConfig( + {{"fs.azure.account.key.test.dfs.core.windows.net", + azuriteServer->connectionStr()}}); + AbfsFileSystem abfs{hiveConfig}; + FileOptions options; + options.fileSize = -kOneMB; + BOLT_ASSERT_THROW( + abfs.openFileForRead(fullFilePath, options), + "File size must be non-negative"); +} + TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) { std::atomic startThreads = false; auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); + AbfsFileSystem abfs{hiveConfig}; std::vector threads; std::mt19937 generator(std::random_device{}()); @@ -172,7 +197,7 @@ TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) { } std::this_thread::sleep_for( std::chrono::microseconds(sleepTimesInMicroseconds[index])); - auto readFile = abfs->openFileForRead(fullFilePath); + auto readFile = abfs.openFileForRead(fullFilePath); readData(readFile.get()); }); threads.emplace_back(std::move(thread)); @@ -190,8 +215,8 @@ TEST_F(AbfsFileSystemTest, missingFile) { azuriteServer->connectionStr()}}); const std::string abfsFile = bytedance::bolt::filesystems::test::AzuriteABFSEndpoint + "test.txt"; - auto abfs = std::make_shared(hiveConfig); - auto readFile = abfs->openFileForRead(abfsFile); + AbfsFileSystem abfs{hiveConfig}; + auto readFile = abfs.openFileForRead(abfsFile); FAIL() << "Expected BoltException"; } catch (BoltException const& err) { EXPECT_TRUE(err.message().find("404") != std::string::npos); @@ -202,42 +227,42 @@ TEST_F(AbfsFileSystemTest, openFileForWriteNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); + AbfsFileSystem abfs{hiveConfig}; BOLT_ASSERT_THROW( - abfs->openFileForWrite(fullFilePath), "write for abfs not implemented"); + abfs.openFileForWrite(fullFilePath), "write for abfs not implemented"); } TEST_F(AbfsFileSystemTest, renameNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); + AbfsFileSystem abfs{hiveConfig}; BOLT_ASSERT_THROW( - abfs->rename("text", "text2"), "rename for abfs not implemented"); + abfs.rename("text", "text2"), "rename for abfs not implemented"); } TEST_F(AbfsFileSystemTest, removeNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); - BOLT_ASSERT_THROW(abfs->remove("text"), "remove for abfs not implemented"); + AbfsFileSystem abfs{hiveConfig}; + BOLT_ASSERT_THROW(abfs.remove("text"), "remove for abfs not implemented"); } TEST_F(AbfsFileSystemTest, existsNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); - BOLT_ASSERT_THROW(abfs->exists("text"), "exists for abfs not implemented"); + AbfsFileSystem abfs{hiveConfig}; + BOLT_ASSERT_THROW(abfs.exists("text"), "exists for abfs not implemented"); } TEST_F(AbfsFileSystemTest, listNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); - BOLT_ASSERT_THROW(abfs->list("dir"), "list for abfs not implemented"); + AbfsFileSystem abfs{hiveConfig}; + BOLT_ASSERT_THROW(abfs.list("dir"), "list for abfs not implemented"); } TEST_F(AbfsFileSystemTest, mkdirNotImplemented) { @@ -245,24 +270,22 @@ TEST_F(AbfsFileSystemTest, mkdirNotImplemented) { {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); auto abfs = std::make_shared(hiveConfig); - BOLT_ASSERT_THROW(abfs->mkdir("dir"), "mkdir for abfs not implemented"); + BOLT_ASSERT_THROW(abfs.mkdir("dir"), "mkdir for abfs not implemented"); } TEST_F(AbfsFileSystemTest, rmdirNotImplemented) { auto hiveConfig = AbfsFileSystemTest::hiveConfig( {{"fs.azure.account.key.test.dfs.core.windows.net", azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); - BOLT_ASSERT_THROW(abfs->rmdir("dir"), "rmdir for abfs not implemented"); + AbfsFileSystem abfs{hiveConfig}; + BOLT_ASSERT_THROW(abfs.rmdir("dir"), "rmdir for abfs not implemented"); } TEST_F(AbfsFileSystemTest, credNotFOund) { const std::string abfsFile = std::string("abfs://test@test1.dfs.core.windows.net/test"); auto hiveConfig = AbfsFileSystemTest::hiveConfig({}); - auto abfs = - std::make_shared( - hiveConfig); + AbfsFileSystem abfs{hiveConfig}; BOLT_ASSERT_THROW( - abfs->openFileForRead(abfsFile), "Failed to find storage credentials"); + abfs.openFileForRead(abfsFile), "Failed to find storage credentials"); } diff --git a/bolt/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp b/bolt/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp index f771e1b48..dc364e631 100644 --- a/bolt/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp +++ b/bolt/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp @@ -32,6 +32,7 @@ #include "bolt/common/base/Exceptions.h" #include "bolt/common/config/Config.h" #include "bolt/common/file/File.h" +#include "bolt/common/file/FileSystems.h" #include "bolt/connectors/hive/HiveConfig.h" #include "bolt/connectors/hive/storage_adapters/gcs/GCSUtil.h" #include "bolt/core/QueryConfig.h" @@ -83,7 +84,11 @@ class GCSReadFile final : public ReadFile { // Gets the length of the file. // Checks if there are any issues reading the file. - void initialize() { + void initialize(const filesystems::FileOptions& options) { + // If caller provided a trusted file size, use it to avoid metadata call. + if (options.fileSize > 0) { + length_ = options.fileSize; + } // Make it a no-op if invoked twice. if (length_ != -1) { return; @@ -330,10 +335,10 @@ void GCSFileSystem::initializeClient() { std::unique_ptr GCSFileSystem::openFileForRead( std::string_view path, - const FileOptions& /*unused*/) { + const FileOptions& options) { const auto gcspath = gcsPath(path); auto gcsfile = std::make_unique(gcspath, impl_->getClient()); - gcsfile->initialize(); + gcsfile->initialize(options); return gcsfile; } diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index fc15391e7..8969ab96a 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -32,6 +32,7 @@ #include "bolt/common/base/StatsReporter.h" #include "bolt/common/config/Config.h" #include "bolt/common/file/File.h" +#include "bolt/common/file/FileSystems.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Config.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h" @@ -483,10 +484,10 @@ std::string S3FileSystem::getLogPrefix() const { std::unique_ptr S3FileSystem::openFileForRead( std::string_view s3Path, - const FileOptions& /*unused*/) { + const FileOptions& options) { const auto path = getPath(s3Path); auto s3file = std::make_unique(path, impl_->s3Client()); - s3file->initialize(); + s3file->initialize(options); return s3file; } diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.cpp index 63d5b4c8e..936373120 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.cpp @@ -61,7 +61,14 @@ class S3ReadFile::Impl { // Gets the length of the file. // Checks if there are any issues reading the file. - void initialize() { + void initialize(const filesystems::FileOptions& options) { + // Use provided file size if available to bypass HEAD request. + if (options.fileSize > 0) { + length_ = options.fileSize; + } + if (length_ != -1) { + return; + } Aws::S3::Model::HeadObjectRequest request; request.SetBucket(awsString(bucket_)); request.SetKey(awsString(key_)); @@ -172,8 +179,8 @@ S3ReadFile::S3ReadFile(std::string_view path, Aws::S3::S3Client* client) { S3ReadFile::~S3ReadFile() = default; -void S3ReadFile::initialize() { - return impl_->initialize(); +void S3ReadFile::initialize(const filesystems::FileOptions& options) { + return impl_->initialize(options); } std::string_view S3ReadFile::pread(uint64_t offset, uint64_t length, void* buf) diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h b/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h index 398a9896a..3d218ade8 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3ReadFile.h @@ -31,6 +31,7 @@ #pragma once #include "bolt/common/file/File.h" +#include "bolt/common/file/FileSystems.h" namespace Aws::S3 { class S3Client; @@ -66,7 +67,7 @@ class S3ReadFile : public ReadFile { return 72 << 20; } - void initialize(); + void initialize(const filesystems::FileOptions& options); private: void preadInternal(uint64_t offset, uint64_t length, char* position) const;