From 35fdcd928187f452bacb3efbe08596d857a95902 Mon Sep 17 00:00:00 2001 From: dentiny Date: Fri, 12 Sep 2025 22:17:57 +0000 Subject: [PATCH] persist cloud config --- src/moonlink/src/lib.rs | 2 +- src/moonlink/src/storage.rs | 1 + src/moonlink/src/storage/iceberg.rs | 4 +- .../storage/iceberg/aws_security_config.rs | 27 --------- .../storage/iceberg/cloud_security_config.rs | 57 +++++++++++++++++++ .../storage/iceberg/iceberg_table_config.rs | 18 ++++-- .../src/config_utils.rs | 34 +++++++++-- .../src/sqlite/sql/create_secrets.sql | 4 +- .../src/sqlite/sqlite_metadata_store.rs | 9 ++- 9 files changed, 108 insertions(+), 48 deletions(-) delete mode 100644 src/moonlink/src/storage/iceberg/aws_security_config.rs create mode 100644 src/moonlink/src/storage/iceberg/cloud_security_config.rs diff --git a/src/moonlink/src/lib.rs b/src/moonlink/src/lib.rs index 379185628..48e50d940 100644 --- a/src/moonlink/src/lib.rs +++ b/src/moonlink/src/lib.rs @@ -24,7 +24,7 @@ pub use storage::{ MooncakeTable, MooncakeTableConfig, MoonlinkSecretType, MoonlinkTableConfig, MoonlinkTableSecret, ObjectStorageCache, ObjectStorageCacheConfig, PersistentWalMetadata, SnapshotReadOutput, StorageConfig, TableEventManager, TableManager, TableSnapshotStatus, - TableStatusReader, WalConfig, WalManager, WalTransactionState, + TableStatusReader, WalConfig, WalManager, WalTransactionState, AwsSecurityConfig, CloudSecurityConfig, }; pub use table_handler::TableHandler; pub use table_handler_timer::TableHandlerTimer; diff --git a/src/moonlink/src/storage.rs b/src/moonlink/src/storage.rs index 39953e128..d5aac6acc 100644 --- a/src/moonlink/src/storage.rs +++ b/src/moonlink/src/storage.rs @@ -37,6 +37,7 @@ pub use mooncake_table::table_event_manager::TableEventManager; pub use mooncake_table::table_secret::{ SecretEntry as MoonlinkTableSecret, SecretType as MoonlinkSecretType, }; +pub use iceberg::cloud_security_config::{CloudSecurityConfig, AwsSecurityConfig}; pub use mooncake_table::table_status::TableSnapshotStatus; pub use mooncake_table::table_status_reader::TableStatusReader; pub use mooncake_table::MooncakeTable; diff --git a/src/moonlink/src/storage/iceberg.rs b/src/moonlink/src/storage/iceberg.rs index b8eee63b1..9e111c4bf 100644 --- a/src/moonlink/src/storage/iceberg.rs +++ b/src/moonlink/src/storage/iceberg.rs @@ -21,6 +21,7 @@ pub(super) mod parquet_utils; pub(super) mod puffin_utils; pub(super) mod puffin_writer_proxy; mod table_update_proxy; +pub mod cloud_security_config; #[cfg(feature = "catalog-glue")] pub(super) mod glue_catalog; @@ -44,9 +45,6 @@ mod s3_test_utils; #[cfg(test)] mod gcs_test_utils; -#[cfg(feature = "catalog-glue")] -mod aws_security_config; - #[cfg(test)] mod tests; diff --git a/src/moonlink/src/storage/iceberg/aws_security_config.rs b/src/moonlink/src/storage/iceberg/aws_security_config.rs deleted file mode 100644 index f8bf1e50e..000000000 --- a/src/moonlink/src/storage/iceberg/aws_security_config.rs +++ /dev/null @@ -1,27 +0,0 @@ -/// AWS security config. -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Deserialize, PartialEq, Serialize)] -pub struct AwsSecurityConfig { - #[serde(rename = "access_key_id")] - #[serde(default)] - pub access_key_id: String, - - #[serde(rename = "security_access_key")] - #[serde(default)] - pub security_access_key: String, - - #[serde(rename = "region")] - #[serde(default)] - pub region: String, -} - -impl std::fmt::Debug for AwsSecurityConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AwsSecurityConfig") - .field("access_key_id", &"xxxxx") - .field("security_access_key", &"xxxx") - .field("region", &self.region) - .finish() - } -} diff --git a/src/moonlink/src/storage/iceberg/cloud_security_config.rs b/src/moonlink/src/storage/iceberg/cloud_security_config.rs new file mode 100644 index 000000000..3e245d0b1 --- /dev/null +++ b/src/moonlink/src/storage/iceberg/cloud_security_config.rs @@ -0,0 +1,57 @@ +/// Cloud vendor security config. +/// +/// AWS security config. +use serde::{Deserialize, Serialize}; + +use crate::MoonlinkTableSecret; + +#[derive(Clone, Deserialize, PartialEq, Serialize)] +pub struct AwsSecurityConfig { + #[serde(rename = "access_key_id")] + #[serde(default)] + pub access_key_id: String, + + #[serde(rename = "security_access_key")] + #[serde(default)] + pub security_access_key: String, + + #[serde(rename = "region")] + #[serde(default)] + pub region: String, +} + +impl std::fmt::Debug for AwsSecurityConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AwsSecurityConfig") + .field("access_key_id", &"xxxxx") + .field("security_access_key", &"xxxx") + .field("region", &self.region) + .finish() + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub enum CloudSecurityConfig { + #[cfg(feature = "storage-s3")] + Aws(AwsSecurityConfig), +} + +impl CloudSecurityConfig { + /// Extract security metadata entry from the current cloud security config. + pub fn extract_security_metadata_entry(&self) -> Option { + match self { + #[cfg(feature = "storage-s3")] + CloudSecurityConfig::Aws(aws_security_config) => { + Some(MoonlinkTableSecret { + secret_type: crate::MoonlinkSecretType::S3, + key_id: aws_security_config.access_key_id.clone(), + secret: aws_security_config.security_access_key.clone(), + project: None, + endpoint: None, + region: None, + }) + } + _ => None + } + } +} diff --git a/src/moonlink/src/storage/iceberg/iceberg_table_config.rs b/src/moonlink/src/storage/iceberg/iceberg_table_config.rs index 79b30ce28..788ff9df9 100644 --- a/src/moonlink/src/storage/iceberg/iceberg_table_config.rs +++ b/src/moonlink/src/storage/iceberg/iceberg_table_config.rs @@ -1,5 +1,5 @@ #[cfg(feature = "catalog-glue")] -use crate::storage::iceberg::aws_security_config::AwsSecurityConfig; +use crate::{storage::iceberg::aws_security_config::AwsSecurityConfig, CloudSecurityConfig}; use crate::{storage::filesystem::accessor_config::AccessorConfig, StorageConfig}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -33,15 +33,15 @@ pub struct RestCatalogConfig { pub props: HashMap, } +#[cfg(feature = "catalog-glue")] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct GlueCatalogConfig { /// ======================== /// AWS security configs. /// ======================== /// - #[cfg(feature = "catalog-glue")] - #[serde(rename = "aws_security_config")] - pub aws_security_config: AwsSecurityConfig, + #[serde(rename = "cloud_secret_config")] + pub cloud_secret_config: CloudSecurityConfig, /// ======================== /// Glue properties @@ -114,6 +114,16 @@ impl IcebergCatalogConfig { } } + pub fn get_cloud_secret_config(&self) -> Option { + match self { + #[cfg(feature = "catalog-glue")] + IcebergCatalogConfig::Glue { glue_catalog_config } => { + Some(glue_catalog_config.cloud_secret_config.clone()) + } + _ => None + } + } + #[cfg(feature = "catalog-rest")] pub fn get_rest_catalog_config(&self) -> Option { if let IcebergCatalogConfig::Rest { diff --git a/src/moonlink_metadata_store/src/config_utils.rs b/src/moonlink_metadata_store/src/config_utils.rs index dba99408d..1c98863d7 100644 --- a/src/moonlink_metadata_store/src/config_utils.rs +++ b/src/moonlink_metadata_store/src/config_utils.rs @@ -10,6 +10,20 @@ use serde::{Deserialize, Serialize}; #[cfg(any(feature = "storage-gcs", feature = "storage-s3"))] use url::Url; + +/// Table config entry to persist. +#[derive(Clone, Debug)] +pub(crate) struct TableConfigEntry { + /// Serialized json format for [`MoonlinkTableConfigForPersistence`]. + pub(crate) serialized_moonlink_table_config: serde_json::Value, + /// Cloud vendor secret. + pub(crate) cloud_vendor_secret: Option, + /// Iceberg data access secret. + pub(crate) iceberg_data_access_secret: Option, + /// WAL secret. + pub(crate) wal_secret: Option, +} + /// Struct for iceberg table config. /// Notice it's a subset of [`IcebergTableConfig`] since we want to keep things persisted minimum. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -143,15 +157,12 @@ impl MoonlinkTableConfigForPersistence { /// TODO(hjiang): Handle namespace better. /// Returns: /// - serialized json value of the persisted config +/// - cloud secret config /// - iceberg secret entry /// - wal secret entry pub(crate) fn parse_moonlink_table_config( moonlink_table_config: MoonlinkTableConfig, -) -> Result<( - serde_json::Value, - Option, - Option, -)> { +) -> Result { // Serialize mooncake table config. let iceberg_config = moonlink_table_config.iceberg_table_config; let wal_config = moonlink_table_config.wal_table_config; @@ -180,6 +191,11 @@ pub(crate) fn parse_moonlink_table_config( let config_json = serde_json::to_value(&persisted)?; // Extract table secret entry. + let cloud_secret_config = if let Some(cloud_secret_config) = iceberg_config.metadata_accessor_config.get_cloud_secret_config() { + cloud_secret_config.extract_security_metadata_entry() + } else { + None + }; let iceberg_secret_entry = iceberg_config .metadata_accessor_config .get_file_catalog_accessor_config() @@ -189,7 +205,13 @@ pub(crate) fn parse_moonlink_table_config( .get_accessor_config() .extract_security_metadata_entry(); - Ok((config_json, iceberg_secret_entry, wal_secret_entry)) + let table_config_entry = TableConfigEntry { + serialized_moonlink_table_config: config_json, + cloud_vendor_secret: cloud_secret_config, + iceberg_data_access_secret: iceberg_secret_entry, + wal_secret: wal_secret_entry, + }; + Ok(table_config_entry) } /// Recover filesystem config from persisted config and secret. diff --git a/src/moonlink_metadata_store/src/sqlite/sql/create_secrets.sql b/src/moonlink_metadata_store/src/sqlite/sql/create_secrets.sql index bb9523335..7bf64746d 100644 --- a/src/moonlink_metadata_store/src/sqlite/sql/create_secrets.sql +++ b/src/moonlink_metadata_store/src/sqlite/sql/create_secrets.sql @@ -3,8 +3,8 @@ CREATE TABLE secrets ( id SERIAL PRIMARY KEY, -- Unique row identifier "database" TEXT, -- column store database name "table" TEXT, -- column store table name - usage_type TEXT CHECK (usage_type IN ('iceberg', 'wal')), -- Purpose of secret: 'iceberg' or 'wal'. - storage_provider TEXT CHECK (storage_provider IN ('s3', 'gcs')), -- One of ('s3', 'gcs') + usage_type TEXT CHECK (usage_type IN ('cloud', 'iceberg_storage', 'wal_storage')), + provider TEXT CHECK (storage_provider IN ('aws', 's3', 'gcs')), key_id TEXT, secret TEXT, project TEXT, -- (optional) diff --git a/src/moonlink_metadata_store/src/sqlite/sqlite_metadata_store.rs b/src/moonlink_metadata_store/src/sqlite/sqlite_metadata_store.rs index df80783ca..a0f51435b 100644 --- a/src/moonlink_metadata_store/src/sqlite/sqlite_metadata_store.rs +++ b/src/moonlink_metadata_store/src/sqlite/sqlite_metadata_store.rs @@ -42,13 +42,13 @@ impl MetadataStoreTrait for SqliteMetadataStore { t.src_table_name, t.src_table_uri, t.config, - s_ice.storage_provider AS iceberg_storage_provider, + s_ice.provider AS iceberg_storage_provider, s_ice.key_id AS iceberg_key_id, s_ice.secret AS iceberg_secret, s_ice.endpoint AS iceberg_endpoint, s_ice.region AS iceberg_region, s_ice.project AS iceberg_project, - s_wal.storage_provider AS wal_storage_provider, + s_wal.provider AS wal_storage_provider, s_wal.key_id AS wal_key_id, s_wal.secret AS wal_secret, s_wal.endpoint AS wal_endpoint, @@ -126,9 +126,8 @@ impl MetadataStoreTrait for SqliteMetadataStore { src_table_uri: &str, moonlink_table_config: MoonlinkTableConfig, ) -> Result<()> { - let (serialized_config, iceberg_secret, wal_secret) = - config_utils::parse_moonlink_table_config(moonlink_table_config)?; - let serialized_config = serde_json::to_string(&serialized_config)?; + let table_config_entry = config_utils::parse_moonlink_table_config(moonlink_table_config)?; + let serialized_config = serde_json::to_string(&table_config_entry.serialized_config)?; // Create metadata tables if it doesn't exist. let sqlite_conn = SqliteConnWrapper::new(&self.database_uri).await?;