diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 275d71fce..3d448c860 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -72,6 +72,7 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + update/expire_snapshots.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c139c66b5..fb62975f3 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -94,6 +94,7 @@ iceberg_sources = files( 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/expire_snapshots.cc', 'update/update_properties.cc', 'util/bucket_util.cc', 'util/conversions.cc', @@ -202,6 +203,7 @@ subdir('catalog') subdir('expression') subdir('manifest') subdir('row') +subdir('update') subdir('util') if get_option('tests').enabled() diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h index 8370db141..981c908d7 100644 --- a/src/iceberg/pending_update.h +++ b/src/iceberg/pending_update.h @@ -69,4 +69,44 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { PendingUpdate() = default; }; +/// \brief Template class for type-safe table metadata changes using builder pattern +/// +/// PendingUpdateTyped extends PendingUpdate with a type-safe Apply() method that +/// returns the specific result type for each operation. Subclasses implement +/// specific types of table updates such as schema changes, property updates, or +/// snapshot-producing operations like appends and deletes. +/// +/// Apply() can be used to validate and inspect the uncommitted changes before +/// committing. Commit() applies the changes and commits them to the table. +/// +/// \tparam T The type of result returned by Apply() +template +class ICEBERG_EXPORT PendingUpdateTyped : public PendingUpdate { + public: + ~PendingUpdateTyped() override = default; + + /// \brief Apply the pending changes and return the uncommitted result (typed version) + /// + /// This does not result in a permanent update. + /// + /// \return the uncommitted changes that would be committed, or an error: + /// - ValidationFailed: if pending changes cannot be applied + /// - InvalidArgument: if pending changes are conflicting + virtual Result ApplyTyped() = 0; + + /// \brief Apply the pending changes (base class override) + /// + /// This implementation calls ApplyTyped() and discards the result. + Status Apply() override { + auto result = ApplyTyped(); + if (!result.has_value()) { + return std::unexpected(result.error()); + } + return {}; + } + + protected: + PendingUpdateTyped() = default; +}; + } // namespace iceberg diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 458711255..46fee10fc 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -26,6 +26,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" +#include "iceberg/update/expire_snapshots.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -117,6 +118,10 @@ std::unique_ptr Table::NewTransaction() const { throw NotImplemented("Table::NewTransaction is not implemented"); } +std::unique_ptr Table::NewExpireSnapshots() { + return std::make_unique(this); +} + const std::shared_ptr& Table::io() const { return io_; } std::unique_ptr Table::NewScan() const { diff --git a/src/iceberg/table.h b/src/iceberg/table.h index df3a0c32e..7f3b73734 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -121,6 +121,11 @@ class ICEBERG_EXPORT Table { /// \return a pointer to the new Transaction virtual std::unique_ptr NewTransaction() const; + /// \brief Create a new expire snapshots operation for this table + /// + /// \return a unique pointer to the new ExpireSnapshots operation + virtual std::unique_ptr NewExpireSnapshots(); + /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 9892e3d4f..5c17a26e7 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -90,6 +90,8 @@ add_iceberg_test(expression_test predicate_test.cc strict_metrics_evaluator_test.cc) +add_iceberg_test(update_test SOURCES update/expire_snapshots_test.cc) + add_iceberg_test(json_serde_test SOURCES json_internal_test.cc diff --git a/src/iceberg/test/update/expire_snapshots_test.cc b/src/iceberg/test/update/expire_snapshots_test.cc new file mode 100644 index 000000000..e83bf31be --- /dev/null +++ b/src/iceberg/test/update/expire_snapshots_test.cc @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// Basic API tests for ExpireSnapshots +// Full functional tests will be added when the implementation is complete + +TEST(ExpireSnapshotsTest, FluentApiChaining) { + // Test that the fluent API works correctly with method chaining + ExpireSnapshots expire(nullptr); + + auto& result = + expire.ExpireSnapshotId(123).ExpireOlderThan(1000000).RetainLast(5).SetCleanupLevel( + CleanupLevel::kMetadataOnly); + + // Verify that chaining returns the same object + EXPECT_EQ(&result, &expire); +} + +TEST(ExpireSnapshotsTest, ExpireSnapshotId) { + ExpireSnapshots expire(nullptr); + expire.ExpireSnapshotId(123); + + // Currently returns NotImplemented - this test will be expanded + // when the actual implementation is added + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, ExpireMultipleSnapshotIds) { + ExpireSnapshots expire(nullptr); + expire.ExpireSnapshotId(100).ExpireSnapshotId(200).ExpireSnapshotId(300); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, ExpireOlderThan) { + ExpireSnapshots expire(nullptr); + int64_t timestamp = 1609459200000; // 2021-01-01 00:00:00 UTC + expire.ExpireOlderThan(timestamp); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, RetainLast) { + ExpireSnapshots expire(nullptr); + expire.RetainLast(10); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, DeleteWithCallback) { + ExpireSnapshots expire(nullptr); + std::vector deleted_files; + + expire.DeleteWith( + [&deleted_files](std::string_view file) { deleted_files.emplace_back(file); }); + + // Currently returns NotImplemented + auto result = expire.Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, CleanupLevelNone) { + ExpireSnapshots expire(nullptr); + expire.SetCleanupLevel(CleanupLevel::kNone); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, CleanupLevelMetadataOnly) { + ExpireSnapshots expire(nullptr); + expire.SetCleanupLevel(CleanupLevel::kMetadataOnly); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, CleanupLevelAll) { + ExpireSnapshots expire(nullptr); + expire.SetCleanupLevel(CleanupLevel::kAll); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, CombinedConfiguration) { + ExpireSnapshots expire(nullptr); + int64_t timestamp = 1609459200000; + + expire.ExpireSnapshotId(100) + .ExpireSnapshotId(200) + .ExpireOlderThan(timestamp) + .RetainLast(5) + .SetCleanupLevel(CleanupLevel::kMetadataOnly); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, CommitNotImplemented) { + ExpireSnapshots expire(nullptr); + expire.ExpireSnapshotId(123); + + // Currently returns NotImplemented + auto result = expire.Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +TEST(ExpireSnapshotsTest, ApplyNotImplemented) { + ExpireSnapshots expire(nullptr); + expire.ExpireOlderThan(1000000); + + // Currently returns NotImplemented + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 72ba5182c..d8f1735b9 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -43,6 +43,11 @@ class ICEBERG_EXPORT Transaction { /// \return a new AppendFiles virtual std::shared_ptr NewAppend() = 0; + /// \brief Create a new expire snapshots operation for this transaction + /// + /// \return a unique pointer to the new ExpireSnapshots operation + virtual std::unique_ptr NewExpireSnapshots() = 0; + /// \brief Apply the pending changes from all actions and commit /// /// This method applies all pending data operations and metadata updates in the diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0e1867f60..bd1cafbd4 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -164,6 +164,9 @@ template class PendingUpdateTyped; class UpdateProperties; +enum class CleanupLevel; +class ExpireSnapshots; + /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc new file mode 100644 index 000000000..adb055893 --- /dev/null +++ b/src/iceberg/update/expire_snapshots.cc @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" + +namespace iceberg { + +ExpireSnapshots::ExpireSnapshots(Table* table) : table_(table) {} + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) { + expire_older_than_ms_ = timestamp_millis; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) { + retain_last_ = num_snapshots; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::DeleteWith( + std::function delete_func) { + delete_func_ = std::move(delete_func); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::SetCleanupLevel(CleanupLevel level) { + cleanup_level_ = level; + return *this; +} + +Result>> ExpireSnapshots::ApplyTyped() { + // Placeholder implementation - full snapshot expiration logic to be implemented + return NotImplemented("ExpireSnapshots::ApplyTyped() is not yet implemented"); +} + +Status ExpireSnapshots::Commit() { + // Placeholder implementation - full commit logic to be implemented + return NotImplemented("ExpireSnapshots::Commit() is not yet implemented"); +} + +} // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h new file mode 100644 index 000000000..c2067ab5b --- /dev/null +++ b/src/iceberg/update/expire_snapshots.h @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/expire_snapshots.h +/// API for removing old snapshots from a table + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/pending_update.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cleanup level for snapshot expiration +/// +/// Controls which files are deleted during snapshot expiration. +enum class CleanupLevel { + /// Skip all file cleanup, only remove snapshot metadata + kNone, + /// Clean up only metadata files (manifests, manifest lists, statistics), + /// retain data files + kMetadataOnly, + /// Clean up both metadata and data files (default) + kAll, +}; + +/// \brief API for removing old snapshots from a table +/// +/// ExpireSnapshots accumulates snapshot deletions and commits the new snapshot +/// list to the table. This API does not allow deleting the current snapshot. +/// +/// When committing, changes are applied to the latest table metadata. Commit +/// conflicts are resolved by applying the changes to the new latest metadata +/// and reattempting the commit. +/// +/// Manifest files that are no longer used by valid snapshots will be deleted. +/// Data files that were deleted by snapshots that are expired will be deleted. +/// DeleteWith() can be used to pass an alternative deletion method. +/// +/// Apply() returns a list of the snapshots that will be removed (preview mode). +/// +/// Example usage: +/// \code +/// table.ExpireSnapshots() +/// .ExpireOlderThan(timestampMillis) +/// .RetainLast(5) +/// .Commit(); +/// \endcode +class ICEBERG_EXPORT ExpireSnapshots + : public PendingUpdateTyped>> { + public: + /// \brief Constructor for ExpireSnapshots operation + /// + /// \param table The table to expire snapshots from + explicit ExpireSnapshots(Table* table); + ~ExpireSnapshots() override = default; + + /// \brief Expire a specific snapshot identified by id + /// + /// Marks a specific snapshot for removal. This method can be called multiple + /// times to expire multiple snapshots. Snapshots marked by this method will + /// be expired even if they would be retained by RetainLast(). + /// + /// \param snapshot_id ID of the snapshot to expire + /// \return Reference to this for method chaining + ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id); + + /// \brief Expire all snapshots older than the given timestamp + /// + /// Sets a timestamp threshold - all snapshots created before this time will + /// be expired (unless retained by RetainLast()). + /// + /// \param timestamp_millis Timestamp in milliseconds since epoch + /// \return Reference to this for method chaining + ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis); + + /// \brief Retain the most recent ancestors of the current snapshot + /// + /// If a snapshot would be expired because it is older than the expiration + /// timestamp, but is one of the num_snapshots most recent ancestors of the + /// current state, it will be retained. This will not prevent snapshots + /// explicitly identified by ExpireSnapshotId() from expiring. + /// + /// This may keep more than num_snapshots ancestors if snapshots are added + /// concurrently. This may keep less than num_snapshots ancestors if the + /// current table state does not have that many. + /// + /// \param num_snapshots The number of snapshots to retain + /// \return Reference to this for method chaining + ExpireSnapshots& RetainLast(int num_snapshots); + + /// \brief Set a custom file deletion callback + /// + /// Passes an alternative delete implementation that will be used for + /// manifests and data files. If this method is not called, unnecessary + /// manifests and data files will still be deleted using the default method. + /// + /// Manifest files that are no longer used by valid snapshots will be deleted. + /// Data files that were deleted by snapshots that are expired will be deleted. + /// + /// \param delete_func Callback function that will be called for each file to delete + /// \return Reference to this for method chaining + ExpireSnapshots& DeleteWith(std::function delete_func); + + /// \brief Configure the cleanup level for expired files + /// + /// This method provides fine-grained control over which files are cleaned up + /// during snapshot expiration. + /// + /// Use CleanupLevel::kMetadataOnly when data files are shared across tables or + /// when using procedures like add-files that may reference the same data files. + /// + /// Use CleanupLevel::kNone when data and metadata files may be more efficiently + /// removed using a distributed framework through the actions API. + /// + /// \param level The cleanup level to use for expired snapshots + /// \return Reference to this for method chaining + ExpireSnapshots& SetCleanupLevel(CleanupLevel level); + + /// \brief Apply the pending changes and return the uncommitted result + /// + /// This does not result in a permanent update. + /// + /// \return the list of snapshots that would be expired, or an error: + /// - ValidationFailed: if pending changes cannot be applied + Result>> ApplyTyped() override; + + /// \brief Apply and commit the pending changes to the table + /// + /// Changes are committed by calling the underlying table's commit operation. + /// + /// Once the commit is successful, the updated table will be refreshed. + /// + /// \return Status::OK if the commit was successful, or an error: + /// - ValidationFailed: if update cannot be applied to current metadata + /// - CommitFailed: if update cannot be committed due to conflicts + Status Commit() override; + + // Non-copyable, movable (inherited from PendingUpdate) + ExpireSnapshots(const ExpireSnapshots&) = delete; + ExpireSnapshots& operator=(const ExpireSnapshots&) = delete; + + private: + Table* table_; + std::vector snapshot_ids_to_expire_; + std::optional expire_older_than_ms_; + std::optional retain_last_; + std::optional> delete_func_; + CleanupLevel cleanup_level_ = CleanupLevel::kAll; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build new file mode 100644 index 000000000..f0ecd3e92 --- /dev/null +++ b/src/iceberg/update/meson.build @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +install_headers(['expire_snapshots.h'], subdir: 'iceberg/update')