diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 310d98dadd0..a62fa36a8d5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -41,6 +41,7 @@ import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater; import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater; import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater; +import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV3MetadataValidatorAndUpdater; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.metrics.SnapshotMetrics; import io.delta.kernel.internal.metrics.SnapshotQueryContext; @@ -481,6 +482,16 @@ protected Tuple2, Optional> validateAndUpdateProtoc newMetadata = icebergWriterCompatV1; } + Optional icebergWriterCompatV3 = + IcebergWriterCompatV3MetadataValidatorAndUpdater + .validateAndUpdateIcebergWriterCompatV3Metadata( + isCreateOrReplace, + newMetadata.orElse(baseMetadata), + newProtocol.orElse(baseProtocol)); + if (icebergWriterCompatV3.isPresent()) { + newMetadata = icebergWriterCompatV3; + } + // TODO: refactor this method to use a single validator and updater. Optional icebergCompatV2Metadata = IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata( diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 12d96dc68f3..7bdf298f571 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -72,7 +72,7 @@ public class AddFile extends RowBackedAction { /** * Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and - * partition values. + * partition values. Use null DV descriptor when not present. */ public static AddFile convertDataFileStatus( StructType physicalSchema, @@ -81,6 +81,33 @@ public static AddFile convertDataFileStatus( Map partitionValues, boolean dataChange, Map tags) { + + return convertDataFileStatus( + physicalSchema, + tableRoot, + dataFileStatus, + partitionValues, + dataChange, + tags, + null); // DeletionVectorDescriptor is absent + } + + /** + * Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and + * partition values. + */ + public static AddFile convertDataFileStatus( + StructType physicalSchema, + URI tableRoot, + DataFileStatus dataFileStatus, + Map partitionValues, + boolean dataChange, + Map tags, + DeletionVectorDescriptor deletionVectorDescriptor) { + + Optional deletionVectorOpt = + deletionVectorDescriptor != null ? Optional.of(deletionVectorDescriptor) : Optional.empty(); + Optional tagMapValue = !tags.isEmpty() ? Optional.of(VectorUtils.stringStringMapValue(tags)) : Optional.empty(); Row row = @@ -91,8 +118,8 @@ public static AddFile convertDataFileStatus( dataFileStatus.getSize(), dataFileStatus.getModificationTime(), dataChange, - Optional.empty(), // deletionVector - tagMapValue, // tags + deletionVectorOpt, + tagMapValue, Optional.empty(), // baseRowId Optional.empty(), // defaultRowCommitVersion dataFileStatus.getStatistics()); @@ -116,8 +143,6 @@ public static Row createAddFileRow( checkArgument(path != null, "path is not nullable"); checkArgument(partitionValues != null, "partitionValues is not nullable"); - // TODO - Add support for DeletionVectorDescriptor - checkArgument(!deletionVector.isPresent(), "DeletionVectorDescriptor is unsupported"); Map fieldMap = new HashMap<>(); fieldMap.put(FULL_SCHEMA.indexOf("path"), path); @@ -131,7 +156,20 @@ public static Row createAddFileRow( version -> fieldMap.put(FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version)); stats.ifPresent( stat -> fieldMap.put(FULL_SCHEMA.indexOf("stats"), stat.serializeAsJson(physicalSchema))); - + deletionVector.ifPresent( + dv -> { + Map dvFieldMap = new HashMap<>(); + StructType dvSchema = DeletionVectorDescriptor.READ_SCHEMA; + + dvFieldMap.put(dvSchema.indexOf("storageType"), dv.getStorageType()); + dvFieldMap.put(dvSchema.indexOf("pathOrInlineDv"), dv.getPathOrInlineDv()); + dv.getOffset().ifPresent(offset -> dvFieldMap.put(dvSchema.indexOf("offset"), offset)); + dvFieldMap.put(dvSchema.indexOf("sizeInBytes"), dv.getSizeInBytes()); + dvFieldMap.put(dvSchema.indexOf("cardinality"), dv.getCardinality()); + + Row dvRow = new GenericRow(dvSchema, dvFieldMap); + fieldMap.put(FULL_SCHEMA.indexOf("deletionVector"), dvRow); + }); return new GenericRow(FULL_SCHEMA, fieldMap); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java index bfe9df8ce48..08a73b36063 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java @@ -34,6 +34,7 @@ import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater; +import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater; import io.delta.kernel.statistics.DataFileStatistics; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; @@ -107,6 +108,66 @@ public static Row generateIcebergCompatWriterV1AddAction( transactionState, fileStatus, partitionValues, dataChange, Collections.emptyMap()); } + /** + * Create an add action {@link Row} that can be passed to {@link Transaction#commit(Engine, + * CloseableIterable)} from an Iceberg add. + * + * @param transactionState the transaction state from the built transaction + * @param fileStatus the file status to create the add with (contains path, time, size, and stats) + * @param partitionValues the partition values for the add + * @param dataChange whether or not the add constitutes a dataChange (i.e. append vs. compaction) + * @param tags key-value metadata to be attached to the add action + * @return add action row that can be included in the transaction + * @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled + * @throws UnsupportedOperationException if maxRetries != 0 in the transaction + * @throws KernelException if stats are not present (required for icebergCompatV3) + * @throws UnsupportedOperationException if the table is partitioned (currently unsupported) + */ + public static Row generateIcebergCompatWriterV3AddAction( + Row transactionState, + DataFileStatus fileStatus, + Map partitionValues, + boolean dataChange, + Map tags) { + Map configuration = TransactionStateRow.getConfiguration(transactionState); + + /* ----- Validate that this is a valid usage of this API ----- */ + validateIcebergWriterCompatV3Enabled(configuration); + validateMaxRetriesSetToZero(transactionState); + + /* ----- Validate this is valid write given the table's protocol & configurations ----- */ + checkState( + TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(configuration), + "icebergCompatV3 not enabled despite icebergWriterCompatV3 enabled"); + // We require field `numRecords` when icebergCompatV3 is enabled + IcebergCompatV3MetadataValidatorAndUpdater.validateDataFileStatus(fileStatus); + + /* --- Validate and update partitionValues ---- */ + // Currently we don't support partitioned tables; fail here + blockPartitionedTables(transactionState, partitionValues); + + URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri(); + // This takes care of relativizing the file path and serializing the file statistics + AddFile addFile = + AddFile.convertDataFileStatus( + TransactionStateRow.getPhysicalSchema(transactionState), + tableRoot, + fileStatus, + partitionValues, + dataChange, + tags); + return SingleAction.createAddFileSingleAction(addFile.toRow()); + } + + public static Row generateIcebergCompatWriterV3AddAction( + Row transactionState, + DataFileStatus fileStatus, + Map partitionValues, + boolean dataChange) { + return generateIcebergCompatWriterV3AddAction( + transactionState, fileStatus, partitionValues, dataChange, Collections.emptyMap()); + } + /** * Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine, * CloseableIterable)} from an Iceberg remove. @@ -157,6 +218,56 @@ public static Row generateIcebergCompatWriterV1RemoveAction( return SingleAction.createRemoveFileSingleAction(removeFileRow); } + /** + * Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine, + * CloseableIterable)} from an Iceberg remove. + * + * @param transactionState the transaction state from the built transaction + * @param fileStatus the file status to create the remove with (contains path, time, size, and + * stats) + * @param partitionValues the partition values for the remove + * @param dataChange whether or not the remove constitutes a dataChange (i.e. delete vs. + * compaction) + * @return remove action row that can be committed to the transaction + * @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled + * @throws UnsupportedOperationException if maxRetries != 0 in the transaction + * @throws KernelException if the table is an append-only table and dataChange=true + * @throws UnsupportedOperationException if the table is partitioned (currently unsupported) + */ + public static Row generateIcebergCompatWriterV3RemoveAction( + Row transactionState, + DataFileStatus fileStatus, + Map partitionValues, + boolean dataChange) { + Map config = TransactionStateRow.getConfiguration(transactionState); + + /* ----- Validate that this is a valid usage of this API ----- */ + validateIcebergWriterCompatV3Enabled(config); + validateMaxRetriesSetToZero(transactionState); + + /* ----- Validate this is valid write given the table's protocol & configurations ----- */ + // We only allow removes with dataChange=false when appendOnly=true + if (dataChange && TableConfig.APPEND_ONLY_ENABLED.fromMetadata(config)) { + throw DeltaErrors.cannotModifyAppendOnlyTable( + TransactionStateRow.getTablePath(transactionState)); + } + + /* --- Validate and update partitionValues ---- */ + // Currently we don't support partitioned tables; fail here + blockPartitionedTables(transactionState, partitionValues); + + URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri(); + // This takes care of relativizing the file path and serializing the file statistics + Row removeFileRow = + convertRemoveDataFileStatus( + TransactionStateRow.getPhysicalSchema(transactionState), + tableRoot, + fileStatus, + partitionValues, + dataChange); + return SingleAction.createRemoveFileSingleAction(removeFileRow); + } + ///////////////////// // Private helpers // ///////////////////// @@ -177,6 +288,21 @@ private static void validateIcebergWriterCompatV1Enabled(Map con } } + /** + * Validates that table feature `icebergWriterCompatV3` is enabled. We restrict usage of these + * APIs to require that this table feature is enabled to prevent any unsafe usage due to the table + * features that are blocked via `icebergWriterCompatV3`. + */ + private static void validateIcebergWriterCompatV3Enabled(Map config) { + if (!TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.fromMetadata(config)) { + throw new UnsupportedOperationException( + String.format( + "APIs within GenerateIcebergCompatActionUtils are only supported on tables with" + + " '%s' set to true", + TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.getKey())); + } + } + /** * Throws an exception if `maxRetries` was not set to 0 in the transaction. We restrict these APIs * to require `maxRetries = 0` since conflict resolution is not supported for operations other diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatMetadataValidatorAndUpdater.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatMetadataValidatorAndUpdater.java index 9f6daa187c6..76e88ffc613 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatMetadataValidatorAndUpdater.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatMetadataValidatorAndUpdater.java @@ -27,6 +27,7 @@ import io.delta.kernel.internal.util.SchemaIterable; import io.delta.kernel.types.*; import java.util.*; +import java.util.stream.Stream; /** * Contains interfaces and common utility classes performing the validations and updates necessary @@ -55,6 +56,49 @@ abstract class IcebergWriterCompatMetadataValidatorAndUpdater ColumnMapping.updateColumnMappingMetadataIfNeeded( inputContext.newMetadata, inputContext.isCreatingNewTable)); + /** + * Creates an IcebergCompatRequiredTablePropertyEnforcer for enabling a specific Iceberg + * compatibility version. The enforcer ensures the property is set to "true" and delegates + * validation to the appropriate metadata validator. + * + * @param tableConfigProperty the table configuration property to enforce + * @param postProcessor the version-specific validation and metadata update processor + * @return configured enforcer for the specified Iceberg compatibility version + */ + protected static IcebergCompatRequiredTablePropertyEnforcer createIcebergCompatEnforcer( + TableConfig tableConfigProperty, PostMetadataProcessor postProcessor) { + return new IcebergCompatRequiredTablePropertyEnforcer<>( + tableConfigProperty, (value) -> value, "true", postProcessor); + } + + /** + * Common set of allowed table features shared across all Iceberg writer compatibility versions. + * This includes the incompatible legacy features (invariants, changeDataFeed, checkConstraints, + * identityColumns, generatedColumns) because they may be present in the table protocol even when + * they are not in use. In later checks we validate that these incompatible features are inactive + * in the table. See the protocol spec for more details. + */ + protected static final Set COMMON_ALLOWED_FEATURES = + Stream.of( + // Incompatible legacy table features + INVARIANTS_W_FEATURE, + CHANGE_DATA_FEED_W_FEATURE, + CONSTRAINTS_W_FEATURE, + IDENTITY_COLUMNS_W_FEATURE, + GENERATED_COLUMNS_W_FEATURE, + // Compatible table features + APPEND_ONLY_W_FEATURE, + COLUMN_MAPPING_RW_FEATURE, + DOMAIN_METADATA_W_FEATURE, + VACUUM_PROTOCOL_CHECK_RW_FEATURE, + CHECKPOINT_V2_RW_FEATURE, + IN_COMMIT_TIMESTAMP_W_FEATURE, + CLUSTERING_W_FEATURE, + TIMESTAMP_NTZ_RW_FEATURE, + TYPE_WIDENING_RW_FEATURE, + TYPE_WIDENING_RW_PREVIEW_FEATURE) + .collect(toSet()); + protected static IcebergCompatCheck createUnsupportedFeaturesCheck( IcebergWriterCompatMetadataValidatorAndUpdater instance) { return (inputContext) -> { @@ -209,6 +253,16 @@ protected static IcebergCompatCheck createUnsupportedFeaturesCheck( } }; + protected static final List COMMON_CHECKS = + Arrays.asList( + UNSUPPORTED_TYPES_CHECK, + PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK, + INVARIANTS_INACTIVE_CHECK, + CHANGE_DATA_FEED_INACTIVE_CHECK, + CHECK_CONSTRAINTS_INACTIVE_CHECK, + IDENTITY_COLUMNS_INACTIVE_CHECK, + GENERATED_COLUMNS_INACTIVE_CHECK); + @Override abstract String compatFeatureName(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdater.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdater.java index ac3675f13ae..fc2e321aa74 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdater.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdater.java @@ -110,45 +110,29 @@ public static Optional validateAndUpdateIcebergWriterCompatV1Metadata( private static final IcebergWriterCompatV1MetadataValidatorAndUpdater INSTANCE = new IcebergWriterCompatV1MetadataValidatorAndUpdater(); - private static final IcebergCompatRequiredTablePropertyEnforcer ICEBERG_COMPAT_V2_ENABLED = - new IcebergCompatRequiredTablePropertyEnforcer<>( - TableConfig.ICEBERG_COMPAT_V2_ENABLED, - (value) -> value, - "true", - (inputContext) -> - IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata( - inputContext.isCreatingNewTable, - inputContext.newMetadata, - inputContext.newProtocol)); + /** + * Enforcer for Iceberg compatibility V2 (required by V1). Ensures the ICEBERG_COMPAT_V2_ENABLED + * property is set to "true" and delegates validation to the V2 metadata validator. + */ + private static final IcebergCompatRequiredTablePropertyEnforcer + ICEBERG_COMPAT_V2_ENABLED = + createIcebergCompatEnforcer( + TableConfig.ICEBERG_COMPAT_V2_ENABLED, + (inputContext) -> + IcebergCompatV2MetadataValidatorAndUpdater + .validateAndUpdateIcebergCompatV2Metadata( + inputContext.isCreatingNewTable, + inputContext.newMetadata, + inputContext.newProtocol)); /** - * Current set of allowed table features. This may evolve as the protocol evolves. This includes - * the incompatible legacy features (invariants, changeDataFeed, checkConstraints, - * identityColumns, generatedColumns) because they may be present in the table protocol even when - * they are not in use. In later checks we validate that these incompatible features are inactive - * in the table. See the protocol spec for more details. + * Current set of allowed table features for Iceberg writer compat V1. This combines the common + * features with V1-specific features (ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1). */ private static Set ALLOWED_TABLE_FEATURES = - Stream.of( - // Incompatible legacy table features - INVARIANTS_W_FEATURE, - CHANGE_DATA_FEED_W_FEATURE, - CONSTRAINTS_W_FEATURE, - IDENTITY_COLUMNS_W_FEATURE, - GENERATED_COLUMNS_W_FEATURE, - // Compatible table features - APPEND_ONLY_W_FEATURE, - COLUMN_MAPPING_RW_FEATURE, - ICEBERG_COMPAT_V2_W_FEATURE, - ICEBERG_WRITER_COMPAT_V1, - DOMAIN_METADATA_W_FEATURE, - VACUUM_PROTOCOL_CHECK_RW_FEATURE, - CHECKPOINT_V2_RW_FEATURE, - IN_COMMIT_TIMESTAMP_W_FEATURE, - CLUSTERING_W_FEATURE, - TIMESTAMP_NTZ_RW_FEATURE, - TYPE_WIDENING_RW_FEATURE, - TYPE_WIDENING_RW_PREVIEW_FEATURE) + Stream.concat( + COMMON_ALLOWED_FEATURES.stream(), + Stream.of(ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1)) .collect(toSet()); /** @@ -215,15 +199,7 @@ protected Set getAllowedTableFeatures() { @Override List icebergCompatChecks() { - return Stream.of( - createUnsupportedFeaturesCheck(this), // Pass 'this' instance - UNSUPPORTED_TYPES_CHECK, - PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK, - INVARIANTS_INACTIVE_CHECK, - CHANGE_DATA_FEED_INACTIVE_CHECK, - CHECK_CONSTRAINTS_INACTIVE_CHECK, - IDENTITY_COLUMNS_INACTIVE_CHECK, - GENERATED_COLUMNS_INACTIVE_CHECK) + return Stream.concat(Stream.of(createUnsupportedFeaturesCheck(this)), COMMON_CHECKS.stream()) .collect(toList()); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdater.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdater.java new file mode 100644 index 00000000000..83ebf64b7b3 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdater.java @@ -0,0 +1,146 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.icebergcompat; + +import static io.delta.kernel.internal.tablefeatures.TableFeatures.*; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.internal.TableConfig; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.tablefeatures.TableFeature; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +public class IcebergWriterCompatV3MetadataValidatorAndUpdater + extends IcebergWriterCompatMetadataValidatorAndUpdater { + + /** + * Validates that any change to property {@link TableConfig#ICEBERG_WRITER_COMPAT_V3_ENABLED} is + * valid. Currently, the changes we support are + * + *
    + *
  • No change in enablement (true to true or false to false) + *
  • Enabling but only on a new table (false to true) + *
+ * + * The changes that we do not support and for which we throw an {@link KernelException} are + * + *
    + *
  • Disabling on an existing table (true to false) + *
  • Enabling on an existing table (false to true) + *
+ */ + public static void validateIcebergWriterCompatV3Change( + Map oldConfig, Map newConfig, boolean isNewTable) { + blockConfigChangeOnExistingTable( + TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED, oldConfig, newConfig, isNewTable); + } + + /** + * Validate and update the given Iceberg Writer Compat V3 metadata. + * + * @param newMetadata Metadata after the current updates + * @param newProtocol Protocol after the current updates + * @return The updated metadata if the metadata is valid and updated, otherwise empty. + * @throws UnsupportedOperationException if the metadata is not compatible with Iceberg Writer V3 + * requirements + */ + public static Optional validateAndUpdateIcebergWriterCompatV3Metadata( + boolean isCreatingNewTable, Metadata newMetadata, Protocol newProtocol) { + return INSTANCE.validateAndUpdateMetadata( + new IcebergCompatInputContext( + INSTANCE.compatFeatureName(), isCreatingNewTable, newMetadata, newProtocol)); + } + + /// ////////////////////////////////////////////////////////////////////////////// + /// Define the compatibility and update checks for icebergWriterCompatV3 /// + /// ////////////////////////////////////////////////////////////////////////////// + + private static final IcebergWriterCompatV3MetadataValidatorAndUpdater INSTANCE = + new IcebergWriterCompatV3MetadataValidatorAndUpdater(); + + /** + * Enforcer for Iceberg compatibility V3. Ensures the ICEBERG_COMPAT_V3_ENABLED property is set to + * "true" and delegates validation to the V3 metadata validator. + */ + private static final IcebergCompatRequiredTablePropertyEnforcer + ICEBERG_COMPAT_V3_ENABLED = + createIcebergCompatEnforcer( + TableConfig.ICEBERG_COMPAT_V3_ENABLED, + (inputContext) -> + IcebergCompatV3MetadataValidatorAndUpdater + .validateAndUpdateIcebergCompatV3Metadata( + inputContext.isCreatingNewTable, + inputContext.newMetadata, + inputContext.newProtocol)); + + /** + * Current set of allowed table features for Iceberg writer compat V3. This combines the common + * features with V3-specific features including variant support, deletion vectors, and row + * tracking. + */ + private static Set ALLOWED_TABLE_FEATURES = + Stream.concat( + COMMON_ALLOWED_FEATURES.stream(), + Stream.of( + ICEBERG_COMPAT_V3_W_FEATURE, + ICEBERG_WRITER_COMPAT_V3, + DELETION_VECTORS_RW_FEATURE, + VARIANT_RW_FEATURE, + VARIANT_SHREDDING_PREVIEW_RW_FEATURE, + VARIANT_RW_PREVIEW_FEATURE, + ROW_TRACKING_W_FEATURE)) + .collect(toSet()); + + @Override + String compatFeatureName() { + return "icebergWriterCompatV3"; + } + + @Override + TableConfig requiredDeltaTableProperty() { + return TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED; + } + + @Override + List requiredDeltaTableProperties() { + return Stream.of(CM_ID_MODE_ENABLED, ICEBERG_COMPAT_V3_ENABLED).collect(toList()); + } + + @Override + List requiredDependencyTableFeatures() { + return Stream.of( + ICEBERG_WRITER_COMPAT_V3, ICEBERG_COMPAT_V3_W_FEATURE, COLUMN_MAPPING_RW_FEATURE) + .collect(toList()); + } + + @Override + List icebergCompatChecks() { + return Stream.concat(Stream.of(createUnsupportedFeaturesCheck(this)), COMMON_CHECKS.stream()) + .collect(toList()); + } + + @Override + protected Set getAllowedTableFeatures() { + return ALLOWED_TABLE_FEATURES; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java index 76193a13e62..e9d40862813 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java @@ -328,9 +328,11 @@ private static Optional assignColumnIdAndPhysicalName( Metadata metadata, boolean isNewTable) { StructType oldSchema = metadata.getSchema(); - // When icebergWriterCompatV1 is enabled we require physicalName='col-[columnId]' + // When icebergWriterCompatV1 or icebergWriterCompatV3 is enabled we require + // physicalName='col-[columnId]' boolean useColumnIdForPhysicalName = - TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.fromMetadata(metadata); + TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.fromMetadata(metadata) + || TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.fromMetadata(metadata); // This is the maxColumnId to use when assigning any new field-ids; we update this as we // traverse the schema and after traversal this is the value that should be stored in the diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala new file mode 100644 index 00000000000..956c94e84d9 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala @@ -0,0 +1,484 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.icebergcompat + +import java.util.Optional + +import scala.collection.JavaConverters._ + +import io.delta.kernel.exceptions.KernelException +import io.delta.kernel.internal.TableConfig +import io.delta.kernel.internal.actions.{Metadata, Protocol} +import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV3MetadataValidatorAndUpdater.validateAndUpdateIcebergWriterCompatV3Metadata +import io.delta.kernel.internal.tablefeatures.TableFeature +import io.delta.kernel.internal.tablefeatures.TableFeatures.{COLUMN_MAPPING_RW_FEATURE, DELETION_VECTORS_RW_FEATURE, ICEBERG_COMPAT_V3_W_FEATURE, ICEBERG_WRITER_COMPAT_V3, ROW_TRACKING_W_FEATURE, TYPE_WIDENING_RW_FEATURE, TYPE_WIDENING_RW_PREVIEW_FEATURE, VARIANT_RW_FEATURE, VARIANT_RW_PREVIEW_FEATURE, VARIANT_SHREDDING_PREVIEW_RW_FEATURE} +import io.delta.kernel.internal.util.ColumnMapping +import io.delta.kernel.types.{ByteType, DataType, DateType, FieldMetadata, IntegerType, LongType, ShortType, StructField, StructType, TimestampNTZType, TypeChange} + +class IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite + extends IcebergCompatV3MetadataValidatorAndUpdaterSuiteBase { + override def validateAndUpdateIcebergCompatMetadata( + isNewTable: Boolean, + metadata: Metadata, + protocol: Protocol): Optional[Metadata] = { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + + val icebergWriterCompatV3EnabledProps = Map( + TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.getKey -> "true") + + val icebergCompatV3EnabledProps = Map( + TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey -> "true") + + val deletionVectorsEnabledProps = Map( + TableConfig.DELETION_VECTORS_CREATION_ENABLED.getKey -> "true") + + val variantShreddingEnabledProps = Map( + TableConfig.VARIANT_SHREDDING_ENABLED.getKey -> "true") + + val columnMappingIdModeProps = Map( + TableConfig.COLUMN_MAPPING_MODE.getKey -> "id") + + val rowTrackingEnabledProps = Map( + TableConfig.ROW_TRACKING_ENABLED.getKey -> "true") + + override def getCompatEnabledMetadata( + schema: StructType, + columnMappingMode: String = "id", + partCols: Seq[String] = Seq.empty): Metadata = { + val result = testMetadata(schema, partCols) + .withMergedConfiguration(( + icebergWriterCompatV3EnabledProps ++ + icebergCompatV3EnabledProps ++ + columnMappingIdModeProps ++ + rowTrackingEnabledProps).asJava) + + result + } + + override def getCompatEnabledProtocol(tableFeatures: TableFeature*): Protocol = { + testProtocol(tableFeatures ++ + Seq( + ICEBERG_WRITER_COMPAT_V3, + ICEBERG_COMPAT_V3_W_FEATURE, + DELETION_VECTORS_RW_FEATURE, + VARIANT_RW_FEATURE, + VARIANT_SHREDDING_PREVIEW_RW_FEATURE, + VARIANT_RW_PREVIEW_FEATURE, + ROW_TRACKING_W_FEATURE, + COLUMN_MAPPING_RW_FEATURE): _*) + } + + /* icebergWriterCompatV3 restricts additional types allowed by icebergCompatV3 */ + override def simpleTypesToSkip: Set[DataType] = Set(ByteType.BYTE, ShortType.SHORT) + + private def checkUnsupportedOrIncompatibleFeature( + tableFeature: String, + expectedErrorMessageContains: String): Unit = { + val protocol = new Protocol( + 3, + 7, + Set("columnMapping", "rowTracking").asJava, + Set( + "columnMapping", + "icebergCompatV3", + "icebergWriterCompatV3", + "deletionVectors", + "rowTracking", + "variantType", + "variantType-preview", + "variantShredding-preview", + tableFeature).asJava) + val metadata = getCompatEnabledMetadata(cmTestSchema()) + Seq(true, false).foreach { isNewTable => + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains(expectedErrorMessageContains)) + } + } + + private def testIncompatibleActiveLegacyFeature( + activeFeatureMetadata: Metadata, + tableFeature: String): Unit = { + Seq(true, false).foreach { isNewTable => + test(s"cannot enable with $tableFeature active, isNewTable = $isNewTable") { + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata( + isNewTable, + activeFeatureMetadata, + getCompatEnabledProtocol()) + } + assert(e.getMessage.contains( + s"Table features [$tableFeature] are incompatible with icebergWriterCompatV3")) + } + } + } + + /* --- CM_ID_MODE_ENABLED and PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK tests --- */ + + Seq(true, false).foreach { isNewTable => + Seq(true, false).foreach { icebergCompatV3Enabled => + test(s"column mapping mode `id` is auto enabled when icebergWriterCompatV3 is enabled, " + + s"isNewTable = $isNewTable, icebergCompatV3Enabled = $icebergCompatV3Enabled") { + + val tblProperties = icebergWriterCompatV3EnabledProps ++ + (if (icebergCompatV3Enabled) { + icebergCompatV3EnabledProps + } else { + Map() + }) + val metadata = testMetadata(cmTestSchema(), tblProps = tblProperties) + val protocol = getCompatEnabledProtocol() + + assert(!metadata.getConfiguration.containsKey("delta.columnMapping.mode")) + + if (isNewTable) { + val updatedMetadata = + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + assert(updatedMetadata.isPresent) + assert(updatedMetadata.get().getConfiguration.get("delta.columnMapping.mode") == "id") + // We correctly populate the column mapping metadata + verifyCMTestSchemaHasValidColumnMappingInfo( + updatedMetadata.get(), + isNewTable, + true, + true) + } else { + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + "The value 'none' for the property 'delta.columnMapping.mode' is" + + " not compatible with icebergWriterCompatV3 requirements")) + } + } + } + } + + test("checks are not enforced when table property is not enabled") { + // Violate check by including BYTE type column + val schema = new StructType().add("col", ByteType.BYTE) + val metadata = testMetadata(schema) + assert(!TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.fromMetadata(metadata)) + validateAndUpdateIcebergWriterCompatV3Metadata( + true, /* isNewTable */ + metadata, + getCompatEnabledProtocol()) + } + + Seq("name", "none").foreach { cmMode => + Seq(true, false).foreach { isNewTable => + test(s"cannot enable icebergWriterCompatV3 with incompatible column mapping mode " + + s"`$cmMode`, isNewTable = $isNewTable") { + val tblProperties = icebergWriterCompatV3EnabledProps ++ + Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> cmMode) + + val metadata = testMetadata(cmTestSchema(), tblProps = tblProperties) + val protocol = getCompatEnabledProtocol() + + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + s"The value '$cmMode' for the property 'delta.columnMapping.mode' is" + + " not compatible with icebergWriterCompatV3 requirements")) + } + } + } + + Seq(true, false).foreach { isNewTable => + test(s"cannot set physicalName to anything other than col-{fieldId}, isNewTable=$isNewTable") { + val schema = new StructType() + .add( + "c1", + IntegerType.INTEGER, + FieldMetadata.builder() + .putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "c1") + .build()) + + val metadata = getCompatEnabledMetadata(schema) + val protocol = getCompatEnabledProtocol() + + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + "requires column mapping field physical names be equal to " + + "'col-[fieldId]', but this is not true for the following fields " + + "[c1(physicalName='c1', columnId=1)]")) + } + } + + Seq(true, false).foreach { isNewTable => + test(s"can provide correct physicalName=col-{fieldId}, isNewTable=$isNewTable") { + val schema = new StructType() + .add( + "c1", + IntegerType.INTEGER, + FieldMetadata.builder() + .putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-1") + .build()) + + val metadata = getCompatEnabledMetadata(schema) + .withMergedConfiguration(Map(ColumnMapping.COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "1").asJava) + val protocol = getCompatEnabledProtocol() + + val updatedMetadata = + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + // No metadata update happens + assert(!updatedMetadata.isPresent) + } + } + + /* --- UNSUPPORTED_TYPES_CHECK tests --- */ + + Seq(ByteType.BYTE, ShortType.SHORT).foreach { unsupportedType => + Seq(true, false).foreach { isNewTable => + test(s"disallowed data types: $unsupportedType, new table = $isNewTable") { + val schema = new StructType().add("col", unsupportedType) + val metadata = getCompatEnabledMetadata(schema) + val protocol = getCompatEnabledProtocol() + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + s"icebergWriterCompatV3 does not support the data types: ")) + } + } + } + + test("compatible type widening is allowed with icebergWriterCompatV3") { + val schema = new StructType() + .add( + new StructField( + "intToLong", + IntegerType.INTEGER, + true, + FieldMetadata.builder() + .putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-1") + .build()).withTypeChanges(Seq(new TypeChange( + IntegerType.INTEGER, + LongType.LONG)).asJava)) + + val metadata = getCompatEnabledMetadata(schema) + .withMergedConfiguration(Map(ColumnMapping.COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "1").asJava) + val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE) + + validateAndUpdateIcebergCompatMetadata(false, metadata, protocol) + } + + test("incompatible type widening throws exception with icebergWriterCompatV3") { + val schema = new StructType() + .add( + new StructField( + "dateToTimestamp", + TimestampNTZType.TIMESTAMP_NTZ, + true, + FieldMetadata.builder() + .putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-1") + .build()).withTypeChanges( + Seq(new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava)) + + val metadata = getCompatEnabledMetadata(schema) + .withMergedConfiguration(Map(ColumnMapping.COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "1").asJava) + val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE, ROW_TRACKING_W_FEATURE) + + val e = intercept[KernelException] { + validateAndUpdateIcebergCompatMetadata(false, metadata, protocol) + } + assert(e.getMessage.contains("icebergCompatV3 does not support type widening present in table")) + } + + /* --- ICEBERG_COMPAT_V3_ENABLED tests --- */ + + Seq(true, false).foreach { isNewTable => + test(s"icebergCompatV3 is auto enabled when icebergWriterCompatV3 is enabled, " + + s"isNewTable = $isNewTable") { + val metadata = testMetadata( + cmTestSchema(), + tblProps = + icebergWriterCompatV3EnabledProps ++ columnMappingIdModeProps ++ rowTrackingEnabledProps) + val protocol = getCompatEnabledProtocol() + assert(!TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(metadata)) + + if (isNewTable) { + val updatedMetadata = + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + assert(updatedMetadata.isPresent) + assert(TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(updatedMetadata.get)) + } else { + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + "The value 'false' for the property 'delta.enableIcebergCompatV3' is" + + " not compatible with icebergWriterCompatV3 requirements")) + } + } + } + + Seq(true, false).foreach { isNewTable => + test(s"cannot enable icebergWriterCompatV3 with icebergCompatV3 explicitly disabled, " + + s"isNewTable = $isNewTable") { + val tblProperties = icebergWriterCompatV3EnabledProps ++ columnMappingIdModeProps ++ + rowTrackingEnabledProps ++ + Map(TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey -> "false") + + val metadata = testMetadata(cmTestSchema(), tblProps = tblProperties) + val protocol = getCompatEnabledProtocol() + + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + assert(e.getMessage.contains( + "The value 'false' for the property 'delta.enableIcebergCompatV3' is" + + " not compatible with icebergWriterCompatV3 requirements")) + } + } + /* --- UNSUPPORTED_FEATURES_CHECK tests --- */ + + test("all supported features are allowed") { + val readerFeatures = + Set("columnMapping", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "rowTracking") + val writerFeatures = Set( + // Legacy incompatible features (allowed as long as they are inactive) + "invariants", + "checkConstraints", + "changeDataFeed", + "identityColumns", + "generatedColumns", + // Compatible table features + "appendOnly", + "columnMapping", + "icebergCompatV3", + "icebergWriterCompatV3", + "domainMetadata", + "vacuumProtocolCheck", + "v2Checkpoint", + "inCommitTimestamp", + "clustering", + "typeWidening", + "typeWidening-preview", + "timestampNtz", + "deletionVectors", + "rowTracking", + "variantType", + "variantType-preview", + "variantShredding-preview") + val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) + val metadata = getCompatEnabledMetadata(cmTestSchema()) + validateAndUpdateIcebergWriterCompatV3Metadata(true, metadata, protocol) + validateAndUpdateIcebergWriterCompatV3Metadata(false, metadata, protocol) + } + + Seq("collations", "defaultColumns").foreach { unsupportedIncompatibleFeature => + test(s"cannot enable with incompatible UNSUPPORTED feature $unsupportedIncompatibleFeature") { + // We add this test here so that it will fail when we add Kernel support for these features + // When this happens -> add the feature to the test above + checkUnsupportedOrIncompatibleFeature( + unsupportedIncompatibleFeature, + "Unsupported Delta table feature: table requires feature " + + s""""$unsupportedIncompatibleFeature" which is unsupported by this version of """ + + "Delta Kernel") + } + } + + /* --- INVARIANTS_INACTIVE_CHECK tests --- */ + testIncompatibleActiveLegacyFeature( + getCompatEnabledMetadata(new StructType() + .add("c1", IntegerType.INTEGER) + .add( + "c2", + IntegerType.INTEGER, + FieldMetadata.builder() + .putString("delta.invariants", "{\"expression\": { \"expression\": \"x > 3\"} }") + .build())), + "invariants") + + /* --- CHANGE_DATA_FEED_INACTIVE_CHECK tests --- */ + testIncompatibleActiveLegacyFeature( + getCompatEnabledMetadata(cmTestSchema()) + .withMergedConfiguration(Map(TableConfig.CHANGE_DATA_FEED_ENABLED.getKey -> "true").asJava), + "changeDataFeed") + + /* --- CHECK_CONSTRAINTS_INACTIVE_CHECK tests --- */ + testIncompatibleActiveLegacyFeature( + getCompatEnabledMetadata(cmTestSchema()) + .withMergedConfiguration(Map("delta.constraints.a" -> "a = b").asJava), + "checkConstraints") + + /* --- IDENTITY_COLUMNS_INACTIVE_CHECK tests --- */ + testIncompatibleActiveLegacyFeature( + getCompatEnabledMetadata(new StructType() + .add("c1", IntegerType.INTEGER) + .add( + "c2", + IntegerType.INTEGER, + FieldMetadata.builder() + .putLong("delta.identity.start", 1L) + .putLong("delta.identity.step", 2L) + .putBoolean("delta.identity.allowExplicitInsert", true) + .build())), + "identityColumns") + + /* --- GENERATED_COLUMNS_INACTIVE_CHECK tests --- */ + testIncompatibleActiveLegacyFeature( + getCompatEnabledMetadata(new StructType() + .add("c1", IntegerType.INTEGER) + .add( + "c2", + IntegerType.INTEGER, + FieldMetadata.builder() + .putString("delta.generationExpression", "{\"expression\": \"c1 + 1\"}") + .build())), + "generatedColumns") + + /* --- requiredDependencyTableFeatures tests --- */ + Seq( + ("columnMapping", "icebergCompatV3"), + ("icebergCompatV3", "columnMapping"), + ("rowTracking", "icebergCompatV3"), + ("deletionVectors", "icebergCompatV3"), + ("variantType", "icebergCompatV3")).foreach { + case (featureToIncludeStr, missingFeatureStr) => + Seq(true, false).foreach { isNewTable => + test( + s"protocol is missing required feature $missingFeatureStr when " + + s"only $featureToIncludeStr present, isNewTable = $isNewTable") { + val metadata = getCompatEnabledMetadata(cmTestSchema()) + val readerFeatures: Set[String] = + if (Set("columnMapping", "rowTracking").contains(featureToIncludeStr)) { + Set(featureToIncludeStr) + } else Set.empty + val writerFeatures = Set("icebergWriterCompatV3", featureToIncludeStr) + val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) + val e = intercept[KernelException] { + validateAndUpdateIcebergWriterCompatV3Metadata(isNewTable, metadata, protocol) + } + // Since we run icebergCompatV3 validation as part of + // ICEBERG_COMPAT_V3_ENABLED.postProcess we actually hit the missing feature error in the + // icebergCompatV3 checks first + assert(e.getMessage.contains( + s"icebergCompatV3: requires the feature '$missingFeatureStr' to be enabled")) + } + } + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CommitIcebergActionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CommitIcebergActionSuite.scala index 19123b805b8..99d57617529 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CommitIcebergActionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CommitIcebergActionSuite.scala @@ -40,99 +40,176 @@ class CommitIcebergActionSuite extends DeltaTableWriteSuiteBase { private val tblPropertiesIcebergWriterCompatV1Enabled = Map( TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.getKey -> "true") + private val tblPropertiesIcebergWriterCompatV3Enabled = Map( + TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.getKey -> "true") + /* ----- Error cases ----- */ - test("requires that maxRetries = 0") { - withTempDirAndEngine { (tablePath, engine) => - val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) - .withSchema(engine, testSchema) - .withTableProperties(engine, tblPropertiesIcebergWriterCompatV1Enabled.asJava) - .build(engine) - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) - } - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) + Seq("V1", "V3").foreach { version => + test(s"$version: requires that maxRetries = 0") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled + + val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) + .withSchema(engine, testSchema) + .withTableProperties(engine, properties.asJava) + .build(engine) + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } } } - } - test("requires that icebergWriterCompatV1 enabled") { - withTempDirAndEngine { (tablePath, engine) => - val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) - .withSchema(engine, testSchema) - .withMaxRetries(0) - .build(engine) - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) - } - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) + test(s"$version: requires that icebergWriterCompat${version} enabled") { + withTempDirAndEngine { (tablePath, engine) => + val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) + .withSchema(engine, testSchema) + .withMaxRetries(0) + .build(engine) + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } } } - } - test("partitioned tables unsupported") { - withTempDirAndEngine { (tablePath, engine) => - val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) - .withSchema(engine, testPartitionSchema) - .withTableProperties(engine, tblPropertiesIcebergWriterCompatV1Enabled.asJava) - .withMaxRetries(0) - .withPartitionColumns(engine, testPartitionColumns.asJava) - .build(engine) - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) - } - intercept[UnsupportedOperationException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) + test(s"$version: partitioned tables unsupported") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled + + val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) + .withSchema(engine, testPartitionSchema) + .withTableProperties(engine, properties.asJava) + .withMaxRetries(0) + .withPartitionColumns(engine, testPartitionColumns.asJava) + .build(engine) + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } + intercept[UnsupportedOperationException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } } } - } - test("cannot create add without stats present") { - withTempDirAndEngine { (tablePath, engine) => - val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) - .withSchema(engine, testSchema) - .withTableProperties(engine, tblPropertiesIcebergWriterCompatV1Enabled.asJava) - .withMaxRetries(0) - .build(engine) - intercept[KernelException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet", includeStats = false), - Collections.emptyMap(), - true /* dataChange */ - ) + test(s"$version: cannot create add without stats present") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled + + val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath)) + .withSchema(engine, testSchema) + .withTableProperties(engine, properties.asJava) + .withMaxRetries(0) + .build(engine) + intercept[KernelException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet", includeStats = false), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet", includeStats = false), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } } } } @@ -225,255 +302,359 @@ class CommitIcebergActionSuite extends DeltaTableWriteSuiteBase { assert(addsFoundSet == expectedAdds.map(_.copy(dataChange = false))) } - test("Correctly commits adds to table and compat with Spark") { - withTempDirAndEngine { (tablePath, engine) => - // Create table - createEmptyTable( - engine, - tablePath, - testSchema, - tableProperties = tblPropertiesIcebergWriterCompatV1Enabled) - - // Append 1 add with dataChange = true - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ )) - commitTransaction( - txn, - engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } + Seq("V1", "V3").foreach { version => + test(s"$version: Correctly commits adds to table and compat with Spark") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled - // Append 1 add with dataChange = false (in theory this could involve updating stats but - // once we support remove add a case that looks like optimize/compaction) - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - false /* dataChange */ )) - commitTransaction( - txn, + // Create table + createEmptyTable( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } + tablePath, + testSchema, + tableProperties = properties) + + // Append 1 add with dataChange = true + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - // Verify we wrote the adds we expected into the JSON files using Kernel's getChanges - checkActionsWrittenInJson(engine, tablePath, 0, Set()) - checkActionsWrittenInJson( - engine, - tablePath, - 1, - Set(ExpectedAdd("file1.parquet", 1000, 10, true))) - checkActionsWrittenInJson( - engine, - tablePath, - 2, - Set(ExpectedAdd("file1.parquet", 1000, 10, false))) - - // Verify that Spark can read the actions written via log replay - checkSparkLogReplay(tablePath, 0, Set()) - checkSparkLogReplay(tablePath, 1, Set(ExpectedAdd("file1.parquet", 1000, 10, true))) - // We added the same path twice so only the second remains after log replay - checkSparkLogReplay(tablePath, 2, Set(ExpectedAdd("file1.parquet", 1000, 10, false))) - } - } + // Append 1 add with dataChange = false (in theory this could involve updating stats but + // once we support remove add a case that looks like optimize/compaction) + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - test("Correctly commits adds and removes to table and compat with Spark") { - withTempDirAndEngine { (tablePath, engine) => - // Create table - createEmptyTable( - engine, - tablePath, - testSchema, - tableProperties = tblPropertiesIcebergWriterCompatV1Enabled) - - // Append 1 add with dataChange = true - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ )) - commitTransaction( - txn, + // Verify we wrote the adds we expected into the JSON files using Kernel's getChanges + checkActionsWrittenInJson(engine, tablePath, 0, Set()) + checkActionsWrittenInJson( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } - - // Re-arrange data by removing that Add and adding a new Add - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - false /* dataChange */ ), - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file2.parquet"), - Collections.emptyMap(), - false /* dataChange */ )) - commitTransaction( - txn, + tablePath, + 1, + Set(ExpectedAdd("file1.parquet", 1000, 10, true))) + checkActionsWrittenInJson( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + tablePath, + 2, + Set(ExpectedAdd("file1.parquet", 1000, 10, false))) + + // Verify that Spark can read the actions written via log replay + checkSparkLogReplay(tablePath, 0, Set()) + checkSparkLogReplay(tablePath, 1, Set(ExpectedAdd("file1.parquet", 1000, 10, true))) + // We added the same path twice so only the second remains after log replay + checkSparkLogReplay(tablePath, 2, Set(ExpectedAdd("file1.parquet", 1000, 10, false))) } + } + + test(s"$version: Correctly commits adds and removes to table and compat with Spark") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled - // Remove that add so that the table is empty - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file2.parquet"), - Collections.emptyMap(), - true /* dataChange */ )) - commitTransaction( - txn, + // Create table + createEmptyTable( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } + tablePath, + testSchema, + tableProperties = properties) + + // Append 1 add with dataChange = true + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - // Verify we wrote the adds we expected into the JSON files using Kernel's getChanges - checkActionsWrittenInJson(engine, tablePath, 0, Set()) - checkActionsWrittenInJson( - engine, - tablePath, - 1, - Set(ExpectedAdd("file1.parquet", 1000, 10, true))) - checkActionsWrittenInJson( - engine, - tablePath, - 2, - Set( - ExpectedAdd("file2.parquet", 1000, 10, false), - ExpectedRemove("file1.parquet", 1000, 10, false))) - checkActionsWrittenInJson( - engine, - tablePath, - 3, - Set(ExpectedRemove("file2.parquet", 1000, 10, true))) - - // Verify that Spark can read the actions written via log replay - checkSparkLogReplay(tablePath, 0, Set()) - checkSparkLogReplay(tablePath, 1, Set(ExpectedAdd("file1.parquet", 1000, 10, true))) - checkSparkLogReplay(tablePath, 2, Set(ExpectedAdd("file2.parquet", 1000, 10, false))) - checkSparkLogReplay(tablePath, 3, Set()) - } - } + // Re-arrange data by removing that Add and adding a new Add + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + }, + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } + + // Remove that add so that the table is empty + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - test("append-only configuration is observed when committing removes") { - withTempDirAndEngine { (tablePath, engine) => - // Create table - createEmptyTable( - engine, - tablePath, - testSchema, - tableProperties = tblPropertiesIcebergWriterCompatV1Enabled ++ Map( - TableConfig.APPEND_ONLY_ENABLED.getKey -> "true")) - - // Append 1 add with dataChange = true - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ )) - commitTransaction( - txn, + // Verify we wrote the adds we expected into the JSON files using Kernel's getChanges + checkActionsWrittenInJson(engine, tablePath, 0, Set()) + checkActionsWrittenInJson( + engine, + tablePath, + 1, + Set(ExpectedAdd("file1.parquet", 1000, 10, true))) + checkActionsWrittenInJson( + engine, + tablePath, + 2, + Set( + ExpectedAdd("file2.parquet", 1000, 10, false), + ExpectedRemove("file1.parquet", 1000, 10, false))) + checkActionsWrittenInJson( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + tablePath, + 3, + Set(ExpectedRemove("file2.parquet", 1000, 10, true))) + + // Verify that Spark can read the actions written via log replay + checkSparkLogReplay(tablePath, 0, Set()) + checkSparkLogReplay(tablePath, 1, Set(ExpectedAdd("file1.parquet", 1000, 10, true))) + checkSparkLogReplay(tablePath, 2, Set(ExpectedAdd("file2.parquet", 1000, 10, false))) + checkSparkLogReplay(tablePath, 3, Set()) } + } + + test(s"$version: append-only configuration is observed when committing removes") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled - // Re-arrange data by removing that Add and adding a new Add - // (can commit remove with dataChange=false) - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - false /* dataChange */ ), - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file2.parquet"), - Collections.emptyMap(), - false /* dataChange */ )) - commitTransaction( - txn, + // Create table + createEmptyTable( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } + tablePath, + testSchema, + tableProperties = properties ++ Map( + TableConfig.APPEND_ONLY_ENABLED.getKey -> "true")) + + // Append 1 add with dataChange = true + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - // Cannot create remove with dataChange=true - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - intercept[KernelException] { - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true /* dataChange */ - ) + // Re-arrange data by removing that Add and adding a new Add + // (can commit remove with dataChange=false) + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + }, + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file2.parquet"), + Collections.emptyMap(), + false /* dataChange */ ) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } + + // Cannot create remove with dataChange=true + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + intercept[KernelException] { + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3RemoveAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true /* dataChange */ + ) + } + } } } } - } - test("Tags can be successfully passed for generating addFile") { - withTempDirAndEngine { (tablePath, engine) => - // Create table - createEmptyTable( - engine, - tablePath, - testSchema, - tableProperties = tblPropertiesIcebergWriterCompatV1Enabled) - - // Commit one add file with tags - val tags = Map("tag1" -> "abc", "tag2" -> "def") - - { - val txn = createTxn(engine, tablePath, maxRetries = 0) - val actionsToCommit = Seq( - GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( - txn.getTransactionState(engine), - generateDataFileStatus(tablePath, "file1.parquet"), - Collections.emptyMap(), - true, /* dataChange */ - tags.asJava)) - commitTransaction( - txn, + test(s"$version: Tags can be successfully passed for generating addFile") { + withTempDirAndEngine { (tablePath, engine) => + val properties = if (version == "V1") tblPropertiesIcebergWriterCompatV1Enabled + else tblPropertiesIcebergWriterCompatV3Enabled + + // Create table + createEmptyTable( engine, - inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) - } + tablePath, + testSchema, + tableProperties = properties) + + // Commit one add file with tags + val tags = Map("tag1" -> "abc", "tag2" -> "def") + + { + val txn = createTxn(engine, tablePath, maxRetries = 0) + val actionsToCommit = Seq( + if (version == "V1") { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true, /* dataChange */ + tags.asJava) + } else { + GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV3AddAction( + txn.getTransactionState(engine), + generateDataFileStatus(tablePath, "file1.parquet"), + Collections.emptyMap(), + true, /* dataChange */ + tags.asJava) + }) + commitTransaction( + txn, + engine, + inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator()))) + } - // Read back committed ADD actions - val version = 1 - val rows = Table.forPath(engine, tablePath).asInstanceOf[TableImpl] - .getChanges(engine, version, version, Set(DeltaAction.ADD).asJava) - .toSeq - .flatMap(_.getRows.toSeq) - .filterNot(row => row.isNullAt(row.getSchema.indexOf("add"))) + // Read back committed ADD actions + val tableVersion = 1 + val rows = Table.forPath(engine, tablePath).asInstanceOf[TableImpl] + .getChanges(engine, tableVersion, tableVersion, Set(DeltaAction.ADD).asJava) + .toSeq + .flatMap(_.getRows.toSeq) + .filterNot(row => row.isNullAt(row.getSchema.indexOf("add"))) - assert(rows.size == 1) + assert(rows.size == 1) - val addFile = new AddFile(rows.head.getStruct(rows.head.getSchema.indexOf("add"))) - assert(addFile.getTags.isPresent) - assert(VectorUtils.toJavaMap(addFile.getTags.get()).asScala.equals(tags)) + val addFile = new AddFile(rows.head.getStruct(rows.head.getSchema.indexOf("add"))) + assert(addFile.getTags.isPresent) + assert(VectorUtils.toJavaMap(addFile.getTags.get()).asScala.equals(tags)) + } } } }