Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/moonlink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use storage::{
MooncakeTable, MooncakeTableConfig, MoonlinkSecretType, MoonlinkTableConfig,
MoonlinkTableSecret, ObjectStorageCache, ObjectStorageCacheConfig, PersistentWalMetadata,
SnapshotReadOutput, StorageConfig, TableEventManager, TableManager, TableSnapshotStatus,
TableStatusReader, WalConfig, WalManager, WalTransactionState,
TableStatusReader, WalConfig, WalManager, WalTransactionState, AwsSecurityConfig, CloudSecurityConfig,
};
pub use table_handler::TableHandler;
pub use table_handler_timer::TableHandlerTimer;
Expand Down
1 change: 1 addition & 0 deletions src/moonlink/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use mooncake_table::table_event_manager::TableEventManager;
pub use mooncake_table::table_secret::{
SecretEntry as MoonlinkTableSecret, SecretType as MoonlinkSecretType,
};
pub use iceberg::cloud_security_config::{CloudSecurityConfig, AwsSecurityConfig};
pub use mooncake_table::table_status::TableSnapshotStatus;
pub use mooncake_table::table_status_reader::TableStatusReader;
pub use mooncake_table::MooncakeTable;
Expand Down
4 changes: 1 addition & 3 deletions src/moonlink/src/storage/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(super) mod parquet_utils;
pub(super) mod puffin_utils;
pub(super) mod puffin_writer_proxy;
mod table_update_proxy;
pub mod cloud_security_config;

#[cfg(feature = "catalog-glue")]
pub(super) mod glue_catalog;
Expand All @@ -44,9 +45,6 @@ mod s3_test_utils;
#[cfg(test)]
mod gcs_test_utils;

#[cfg(feature = "catalog-glue")]
mod aws_security_config;

#[cfg(test)]
mod tests;

Expand Down
27 changes: 0 additions & 27 deletions src/moonlink/src/storage/iceberg/aws_security_config.rs

This file was deleted.

57 changes: 57 additions & 0 deletions src/moonlink/src/storage/iceberg/cloud_security_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/// Cloud vendor security config.
///
/// AWS security config.
use serde::{Deserialize, Serialize};

use crate::MoonlinkTableSecret;

#[derive(Clone, Deserialize, PartialEq, Serialize)]
pub struct AwsSecurityConfig {
#[serde(rename = "access_key_id")]
#[serde(default)]
pub access_key_id: String,

#[serde(rename = "security_access_key")]
#[serde(default)]
pub security_access_key: String,

#[serde(rename = "region")]
#[serde(default)]
pub region: String,
}

impl std::fmt::Debug for AwsSecurityConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AwsSecurityConfig")
.field("access_key_id", &"xxxxx")
.field("security_access_key", &"xxxx")
.field("region", &self.region)
.finish()
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum CloudSecurityConfig {
#[cfg(feature = "storage-s3")]
Aws(AwsSecurityConfig),
}

impl CloudSecurityConfig {
/// Extract security metadata entry from the current cloud security config.
pub fn extract_security_metadata_entry(&self) -> Option<MoonlinkTableSecret> {
match self {
#[cfg(feature = "storage-s3")]
CloudSecurityConfig::Aws(aws_security_config) => {
Some(MoonlinkTableSecret {
secret_type: crate::MoonlinkSecretType::S3,
key_id: aws_security_config.access_key_id.clone(),
secret: aws_security_config.security_access_key.clone(),
project: None,
endpoint: None,
region: None,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The region from AwsSecurityConfig is not being used when creating the MoonlinkTableSecret. The region field is being set to None, but it should be populated from aws_security_config.region. This could lead to issues when interacting with AWS services that require a region.

Suggested change
region: None,
region: Some(aws_security_config.region.clone()),

})
}
_ => None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a wildcard _ => None makes this match statement less robust. If new variants are added to CloudSecurityConfig (e.g., for another cloud provider), they will be silently ignored and return None instead of causing a compile error for an unhandled variant. It's better to have an exhaustive match to catch this at compile time and ensure all variants are handled correctly.

}
}
}
18 changes: 14 additions & 4 deletions src/moonlink/src/storage/iceberg/iceberg_table_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "catalog-glue")]
use crate::storage::iceberg::aws_security_config::AwsSecurityConfig;
use crate::{storage::iceberg::aws_security_config::AwsSecurityConfig, CloudSecurityConfig};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This use statement is incorrect. It refers to storage::iceberg::aws_security_config::AwsSecurityConfig, but the aws_security_config module has been removed. Furthermore, AwsSecurityConfig is not directly used in this file. The import should only be for CloudSecurityConfig.

Suggested change
use crate::{storage::iceberg::aws_security_config::AwsSecurityConfig, CloudSecurityConfig};
use crate::CloudSecurityConfig;

use crate::{storage::filesystem::accessor_config::AccessorConfig, StorageConfig};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -33,15 +33,15 @@ pub struct RestCatalogConfig {
pub props: HashMap<String, String>,
}

#[cfg(feature = "catalog-glue")]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct GlueCatalogConfig {
/// ========================
/// AWS security configs.
/// ========================
///
#[cfg(feature = "catalog-glue")]
#[serde(rename = "aws_security_config")]
pub aws_security_config: AwsSecurityConfig,
#[serde(rename = "cloud_secret_config")]
pub cloud_secret_config: CloudSecurityConfig,

/// ========================
/// Glue properties
Expand Down Expand Up @@ -114,6 +114,16 @@ impl IcebergCatalogConfig {
}
}

pub fn get_cloud_secret_config(&self) -> Option<CloudSecurityConfig> {
match self {
#[cfg(feature = "catalog-glue")]
IcebergCatalogConfig::Glue { glue_catalog_config } => {
Some(glue_catalog_config.cloud_secret_config.clone())
}
_ => None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a wildcard _ => None makes this match statement less robust. If new variants are added to IcebergCatalogConfig that contain cloud secrets, they will be silently ignored instead of causing a compile error for an unhandled variant. It's better to have an exhaustive match to ensure all cases are handled.

}
}

#[cfg(feature = "catalog-rest")]
pub fn get_rest_catalog_config(&self) -> Option<RestCatalogConfig> {
if let IcebergCatalogConfig::Rest {
Expand Down
34 changes: 28 additions & 6 deletions src/moonlink_metadata_store/src/config_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ use serde::{Deserialize, Serialize};
#[cfg(any(feature = "storage-gcs", feature = "storage-s3"))]
use url::Url;


/// Table config entry to persist.
#[derive(Clone, Debug)]
pub(crate) struct TableConfigEntry {
/// Serialized json format for [`MoonlinkTableConfigForPersistence`].
pub(crate) serialized_moonlink_table_config: serde_json::Value,
/// Cloud vendor secret.
pub(crate) cloud_vendor_secret: Option<MoonlinkTableSecret>,
/// Iceberg data access secret.
pub(crate) iceberg_data_access_secret: Option<MoonlinkTableSecret>,
/// WAL secret.
pub(crate) wal_secret: Option<MoonlinkTableSecret>,
}

/// Struct for iceberg table config.
/// Notice it's a subset of [`IcebergTableConfig`] since we want to keep things persisted minimum.
#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -143,15 +157,12 @@ impl MoonlinkTableConfigForPersistence {
/// TODO(hjiang): Handle namespace better.
/// Returns:
/// - serialized json value of the persisted config
/// - cloud secret config
/// - iceberg secret entry
/// - wal secret entry
pub(crate) fn parse_moonlink_table_config(
moonlink_table_config: MoonlinkTableConfig,
) -> Result<(
serde_json::Value,
Option<MoonlinkTableSecret>,
Option<MoonlinkTableSecret>,
)> {
) -> Result<TableConfigEntry> {
// Serialize mooncake table config.
let iceberg_config = moonlink_table_config.iceberg_table_config;
let wal_config = moonlink_table_config.wal_table_config;
Expand Down Expand Up @@ -180,6 +191,11 @@ pub(crate) fn parse_moonlink_table_config(
let config_json = serde_json::to_value(&persisted)?;

// Extract table secret entry.
let cloud_secret_config = if let Some(cloud_secret_config) = iceberg_config.metadata_accessor_config.get_cloud_secret_config() {
cloud_secret_config.extract_security_metadata_entry()
} else {
None
};
Comment on lines +194 to +198

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This if let ... else block can be written more concisely and idiomatically using Option::and_then.

    let cloud_secret_config = iceberg_config
        .metadata_accessor_config
        .get_cloud_secret_config()
        .and_then(|config| config.extract_security_metadata_entry());

let iceberg_secret_entry = iceberg_config
.metadata_accessor_config
.get_file_catalog_accessor_config()
Expand All @@ -189,7 +205,13 @@ pub(crate) fn parse_moonlink_table_config(
.get_accessor_config()
.extract_security_metadata_entry();

Ok((config_json, iceberg_secret_entry, wal_secret_entry))
let table_config_entry = TableConfigEntry {
serialized_moonlink_table_config: config_json,
cloud_vendor_secret: cloud_secret_config,
iceberg_data_access_secret: iceberg_secret_entry,
wal_secret: wal_secret_entry,
};
Ok(table_config_entry)
}

/// Recover filesystem config from persisted config and secret.
Expand Down
4 changes: 2 additions & 2 deletions src/moonlink_metadata_store/src/sqlite/sql/create_secrets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ CREATE TABLE secrets (
id SERIAL PRIMARY KEY, -- Unique row identifier
"database" TEXT, -- column store database name
"table" TEXT, -- column store table name
usage_type TEXT CHECK (usage_type IN ('iceberg', 'wal')), -- Purpose of secret: 'iceberg' or 'wal'.
storage_provider TEXT CHECK (storage_provider IN ('s3', 'gcs')), -- One of ('s3', 'gcs')
usage_type TEXT CHECK (usage_type IN ('cloud', 'iceberg_storage', 'wal_storage')),
provider TEXT CHECK (storage_provider IN ('aws', 's3', 'gcs')),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a typo in the CHECK constraint. It refers to storage_provider, but the column has been renamed to provider. This will cause a SQL error when creating the table or inserting data.

    provider TEXT CHECK (provider IN ('aws', 's3', 'gcs')),

key_id TEXT,
secret TEXT,
project TEXT, -- (optional)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ impl MetadataStoreTrait for SqliteMetadataStore {
t.src_table_name,
t.src_table_uri,
t.config,
s_ice.storage_provider AS iceberg_storage_provider,
s_ice.provider AS iceberg_storage_provider,
s_ice.key_id AS iceberg_key_id,
s_ice.secret AS iceberg_secret,
s_ice.endpoint AS iceberg_endpoint,
s_ice.region AS iceberg_region,
s_ice.project AS iceberg_project,
s_wal.storage_provider AS wal_storage_provider,
s_wal.provider AS wal_storage_provider,
Comment on lines +45 to +51

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The LEFT JOIN conditions for s_ice and s_wal use usage_type = 'iceberg' and usage_type = 'wal' respectively. However, in create_secrets.sql, these usage_type values have been changed to 'iceberg_storage' and 'wal_storage'. This query will now fail to retrieve any secrets for iceberg and WAL. The usage_type values in the ON clauses must be updated to match the new schema.

s_wal.key_id AS wal_key_id,
s_wal.secret AS wal_secret,
s_wal.endpoint AS wal_endpoint,
Expand Down Expand Up @@ -126,9 +126,8 @@ impl MetadataStoreTrait for SqliteMetadataStore {
src_table_uri: &str,
moonlink_table_config: MoonlinkTableConfig,
) -> Result<()> {
let (serialized_config, iceberg_secret, wal_secret) =
config_utils::parse_moonlink_table_config(moonlink_table_config)?;
let serialized_config = serde_json::to_string(&serialized_config)?;
let table_config_entry = config_utils::parse_moonlink_table_config(moonlink_table_config)?;
let serialized_config = serde_json::to_string(&table_config_entry.serialized_config)?;
Comment on lines +129 to +130

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This refactoring is incomplete and has introduced several critical issues that will cause compile-time and runtime errors:

  1. The variables iceberg_secret and wal_secret, used later in the function, are no longer defined after this change.
  2. The new cloud_vendor_secret is extracted into table_config_entry but is never persisted to the database.
  3. The subsequent INSERT statements into the secrets table are incorrect as they use:
    • Hardcoded usage_type values ('iceberg', 'wal') that are now invalid according to the updated schema. They should be 'iceberg_storage' and 'wal_storage'.
    • The old column name storage_provider which has been renamed to provider.

The entire function needs to be fixed to correctly extract secrets from table_config_entry and persist them using the correct SQL.


// Create metadata tables if it doesn't exist.
let sqlite_conn = SqliteConnWrapper::new(&self.database_uri).await?;
Expand Down