Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.metrics.SnapshotMetrics;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
Expand Down Expand Up @@ -481,6 +482,16 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
newMetadata = icebergWriterCompatV1;
}

Optional<Metadata> icebergWriterCompatV3 =
IcebergWriterCompatV3MetadataValidatorAndUpdater
.validateAndUpdateIcebergWriterCompatV3Metadata(
isCreateOrReplace,
newMetadata.orElse(baseMetadata),
newProtocol.orElse(baseProtocol));
if (icebergWriterCompatV3.isPresent()) {
newMetadata = icebergWriterCompatV3;
}

// TODO: refactor this method to use a single validator and updater.
Optional<Metadata> icebergCompatV2Metadata =
IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class AddFile extends RowBackedAction {

/**
* Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and
* partition values.
* partition values. Use null DV descriptor when not present.
*/
public static AddFile convertDataFileStatus(
StructType physicalSchema,
Expand All @@ -81,6 +81,33 @@ public static AddFile convertDataFileStatus(
Map<String, Literal> partitionValues,
boolean dataChange,
Map<String, String> tags) {

return convertDataFileStatus(
physicalSchema,
tableRoot,
dataFileStatus,
partitionValues,
dataChange,
tags,
null); // DeletionVectorDescriptor is absent
}

/**
* Utility to generate {@link AddFile} action instance from the given {@link DataFileStatus} and
* partition values.
*/
public static AddFile convertDataFileStatus(
StructType physicalSchema,
URI tableRoot,
DataFileStatus dataFileStatus,
Map<String, Literal> partitionValues,
boolean dataChange,
Map<String, String> tags,
DeletionVectorDescriptor deletionVectorDescriptor) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the existing API take the deletionVectorDescription Optional. Given this is not a public API, we don't need to worry about the existing users and migration is simple anyways.


Optional<DeletionVectorDescriptor> deletionVectorOpt =
deletionVectorDescriptor != null ? Optional.of(deletionVectorDescriptor) : Optional.empty();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do any validations when a DV is added? for example deletion vectors feature is enabled and active?

Optional<MapValue> tagMapValue =
!tags.isEmpty() ? Optional.of(VectorUtils.stringStringMapValue(tags)) : Optional.empty();
Row row =
Expand All @@ -91,8 +118,8 @@ public static AddFile convertDataFileStatus(
dataFileStatus.getSize(),
dataFileStatus.getModificationTime(),
dataChange,
Optional.empty(), // deletionVector
tagMapValue, // tags
deletionVectorOpt,
tagMapValue,
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
dataFileStatus.getStatistics());
Expand All @@ -116,8 +143,6 @@ public static Row createAddFileRow(

checkArgument(path != null, "path is not nullable");
checkArgument(partitionValues != null, "partitionValues is not nullable");
// TODO - Add support for DeletionVectorDescriptor
checkArgument(!deletionVector.isPresent(), "DeletionVectorDescriptor is unsupported");

Map<Integer, Object> fieldMap = new HashMap<>();
fieldMap.put(FULL_SCHEMA.indexOf("path"), path);
Expand All @@ -131,7 +156,20 @@ public static Row createAddFileRow(
version -> fieldMap.put(FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version));
stats.ifPresent(
stat -> fieldMap.put(FULL_SCHEMA.indexOf("stats"), stat.serializeAsJson(physicalSchema)));

deletionVector.ifPresent(
dv -> {
Map<Integer, Object> dvFieldMap = new HashMap<>();
StructType dvSchema = DeletionVectorDescriptor.READ_SCHEMA;

dvFieldMap.put(dvSchema.indexOf("storageType"), dv.getStorageType());
dvFieldMap.put(dvSchema.indexOf("pathOrInlineDv"), dv.getPathOrInlineDv());
dv.getOffset().ifPresent(offset -> dvFieldMap.put(dvSchema.indexOf("offset"), offset));
dvFieldMap.put(dvSchema.indexOf("sizeInBytes"), dv.getSizeInBytes());
dvFieldMap.put(dvSchema.indexOf("cardinality"), dv.getCardinality());

Row dvRow = new GenericRow(dvSchema, dvFieldMap);
fieldMap.put(FULL_SCHEMA.indexOf("deletionVector"), dvRow);
});
return new GenericRow(FULL_SCHEMA, fieldMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.statistics.DataFileStatistics;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -107,6 +108,66 @@ public static Row generateIcebergCompatWriterV1AddAction(
transactionState, fileStatus, partitionValues, dataChange, Collections.emptyMap());
}

/**
* Create an add action {@link Row} that can be passed to {@link Transaction#commit(Engine,
* CloseableIterable)} from an Iceberg add.
*
* @param transactionState the transaction state from the built transaction
* @param fileStatus the file status to create the add with (contains path, time, size, and stats)
* @param partitionValues the partition values for the add
* @param dataChange whether or not the add constitutes a dataChange (i.e. append vs. compaction)
* @param tags key-value metadata to be attached to the add action
* @return add action row that can be included in the transaction
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
* @throws KernelException if stats are not present (required for icebergCompatV3)
* @throws UnsupportedOperationException if the table is partitioned (currently unsupported)
*/
public static Row generateIcebergCompatWriterV3AddAction(
Row transactionState,
DataFileStatus fileStatus,
Map<String, Literal> partitionValues,
boolean dataChange,
Map<String, String> tags) {
Map<String, String> configuration = TransactionStateRow.getConfiguration(transactionState);

/* ----- Validate that this is a valid usage of this API ----- */
validateIcebergWriterCompatV3Enabled(configuration);
validateMaxRetriesSetToZero(transactionState);

/* ----- Validate this is valid write given the table's protocol & configurations ----- */
checkState(
TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(configuration),
"icebergCompatV3 not enabled despite icebergWriterCompatV3 enabled");
// We require field `numRecords` when icebergCompatV3 is enabled
IcebergCompatV3MetadataValidatorAndUpdater.validateDataFileStatus(fileStatus);

/* --- Validate and update partitionValues ---- */
// Currently we don't support partitioned tables; fail here
blockPartitionedTables(transactionState, partitionValues);

URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri();
// This takes care of relativizing the file path and serializing the file statistics
AddFile addFile =
AddFile.convertDataFileStatus(
TransactionStateRow.getPhysicalSchema(transactionState),
tableRoot,
fileStatus,
partitionValues,
dataChange,
tags);
return SingleAction.createAddFileSingleAction(addFile.toRow());
}

public static Row generateIcebergCompatWriterV3AddAction(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this API.

Row transactionState,
DataFileStatus fileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
return generateIcebergCompatWriterV3AddAction(
transactionState, fileStatus, partitionValues, dataChange, Collections.emptyMap());
}

/**
* Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine,
* CloseableIterable)} from an Iceberg remove.
Expand Down Expand Up @@ -157,6 +218,56 @@ public static Row generateIcebergCompatWriterV1RemoveAction(
return SingleAction.createRemoveFileSingleAction(removeFileRow);
}

/**
* Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine,
* CloseableIterable)} from an Iceberg remove.
*
* @param transactionState the transaction state from the built transaction
* @param fileStatus the file status to create the remove with (contains path, time, size, and
* stats)
* @param partitionValues the partition values for the remove
* @param dataChange whether or not the remove constitutes a dataChange (i.e. delete vs.
* compaction)
* @return remove action row that can be committed to the transaction
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
* @throws KernelException if the table is an append-only table and dataChange=true
* @throws UnsupportedOperationException if the table is partitioned (currently unsupported)
*/
public static Row generateIcebergCompatWriterV3RemoveAction(
Row transactionState,
DataFileStatus fileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
Map<String, String> config = TransactionStateRow.getConfiguration(transactionState);

/* ----- Validate that this is a valid usage of this API ----- */
validateIcebergWriterCompatV3Enabled(config);
validateMaxRetriesSetToZero(transactionState);

/* ----- Validate this is valid write given the table's protocol & configurations ----- */
// We only allow removes with dataChange=false when appendOnly=true
if (dataChange && TableConfig.APPEND_ONLY_ENABLED.fromMetadata(config)) {
throw DeltaErrors.cannotModifyAppendOnlyTable(
TransactionStateRow.getTablePath(transactionState));
}

/* --- Validate and update partitionValues ---- */
// Currently we don't support partitioned tables; fail here
blockPartitionedTables(transactionState, partitionValues);

URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri();
// This takes care of relativizing the file path and serializing the file statistics
Row removeFileRow =
convertRemoveDataFileStatus(
TransactionStateRow.getPhysicalSchema(transactionState),
tableRoot,
fileStatus,
partitionValues,
dataChange);
return SingleAction.createRemoveFileSingleAction(removeFileRow);
}

/////////////////////
// Private helpers //
/////////////////////
Expand All @@ -177,6 +288,21 @@ private static void validateIcebergWriterCompatV1Enabled(Map<String, String> con
}
}

/**
* Validates that table feature `icebergWriterCompatV3` is enabled. We restrict usage of these
* APIs to require that this table feature is enabled to prevent any unsafe usage due to the table
* features that are blocked via `icebergWriterCompatV3`.
*/
private static void validateIcebergWriterCompatV3Enabled(Map<String, String> config) {
if (!TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.fromMetadata(config)) {
throw new UnsupportedOperationException(
String.format(
"APIs within GenerateIcebergCompatActionUtils are only supported on tables with"
+ " '%s' set to true",
TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.getKey()));
}
}

/**
* Throws an exception if `maxRetries` was not set to 0 in the transaction. We restrict these APIs
* to require `maxRetries = 0` since conflict resolution is not supported for operations other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.delta.kernel.internal.util.SchemaIterable;
import io.delta.kernel.types.*;
import java.util.*;
import java.util.stream.Stream;

/**
* Contains interfaces and common utility classes performing the validations and updates necessary
Expand Down Expand Up @@ -55,6 +56,49 @@ abstract class IcebergWriterCompatMetadataValidatorAndUpdater
ColumnMapping.updateColumnMappingMetadataIfNeeded(
inputContext.newMetadata, inputContext.isCreatingNewTable));

/**
* Creates an IcebergCompatRequiredTablePropertyEnforcer for enabling a specific Iceberg
* compatibility version. The enforcer ensures the property is set to "true" and delegates
* validation to the appropriate metadata validator.
*
* @param tableConfigProperty the table configuration property to enforce
* @param postProcessor the version-specific validation and metadata update processor
* @return configured enforcer for the specified Iceberg compatibility version
*/
protected static IcebergCompatRequiredTablePropertyEnforcer<Boolean> createIcebergCompatEnforcer(
TableConfig<Boolean> tableConfigProperty, PostMetadataProcessor postProcessor) {
return new IcebergCompatRequiredTablePropertyEnforcer<>(
tableConfigProperty, (value) -> value, "true", postProcessor);
}

/**
* Common set of allowed table features shared across all Iceberg writer compatibility versions.
* This includes the incompatible legacy features (invariants, changeDataFeed, checkConstraints,
* identityColumns, generatedColumns) because they may be present in the table protocol even when
* they are not in use. In later checks we validate that these incompatible features are inactive
* in the table. See the protocol spec for more details.
*/
protected static final Set<TableFeature> COMMON_ALLOWED_FEATURES =
Stream.of(
// Incompatible legacy table features
INVARIANTS_W_FEATURE,
CHANGE_DATA_FEED_W_FEATURE,
CONSTRAINTS_W_FEATURE,
IDENTITY_COLUMNS_W_FEATURE,
GENERATED_COLUMNS_W_FEATURE,
// Compatible table features
APPEND_ONLY_W_FEATURE,
COLUMN_MAPPING_RW_FEATURE,
DOMAIN_METADATA_W_FEATURE,
VACUUM_PROTOCOL_CHECK_RW_FEATURE,
CHECKPOINT_V2_RW_FEATURE,
IN_COMMIT_TIMESTAMP_W_FEATURE,
CLUSTERING_W_FEATURE,
TIMESTAMP_NTZ_RW_FEATURE,
TYPE_WIDENING_RW_FEATURE,
TYPE_WIDENING_RW_PREVIEW_FEATURE)
.collect(toSet());

protected static IcebergCompatCheck createUnsupportedFeaturesCheck(
IcebergWriterCompatMetadataValidatorAndUpdater instance) {
return (inputContext) -> {
Expand Down Expand Up @@ -209,6 +253,16 @@ protected static IcebergCompatCheck createUnsupportedFeaturesCheck(
}
};

protected static final List<IcebergCompatCheck> COMMON_CHECKS =
Arrays.asList(
UNSUPPORTED_TYPES_CHECK,
PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK,
INVARIANTS_INACTIVE_CHECK,
CHANGE_DATA_FEED_INACTIVE_CHECK,
CHECK_CONSTRAINTS_INACTIVE_CHECK,
IDENTITY_COLUMNS_INACTIVE_CHECK,
GENERATED_COLUMNS_INACTIVE_CHECK);

@Override
abstract String compatFeatureName();

Expand Down
Loading
Loading