From 9fe53388249cbaa1047251e3550dcc28d7692a8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Tue, 22 Apr 2025 18:56:20 +0200 Subject: [PATCH 1/3] Implement source-ids to deal with multi arguments transforms --- .palantir/revapi.yml | 11 +++ .../org/apache/iceberg/PartitionField.java | 35 +++++++-- .../org/apache/iceberg/PartitionSpec.java | 45 +++++++---- .../apache/iceberg/UnboundPartitionSpec.java | 35 ++++++--- .../org/apache/iceberg/transforms/Bucket.java | 13 ++++ .../org/apache/iceberg/transforms/Dates.java | 13 ++++ .../org/apache/iceberg/transforms/Days.java | 8 ++ .../org/apache/iceberg/transforms/Hours.java | 8 ++ .../apache/iceberg/transforms/Identity.java | 13 ++++ .../org/apache/iceberg/transforms/Months.java | 8 ++ .../iceberg/transforms/TimeTransform.java | 8 ++ .../apache/iceberg/transforms/Timestamps.java | 13 ++++ .../apache/iceberg/transforms/Transform.java | 17 +++++ .../apache/iceberg/transforms/Truncate.java | 13 ++++ .../iceberg/transforms/UnknownTransform.java | 13 ++++ .../iceberg/transforms/VoidTransform.java | 14 ++++ .../org/apache/iceberg/transforms/Years.java | 8 ++ .../java/org/apache/iceberg/TestHelpers.java | 4 +- .../iceberg/TestPartitionSpecValidation.java | 12 ++- .../apache/iceberg/PartitionSpecParser.java | 44 ++++++++++- .../iceberg/TestPartitionSpecParser.java | 74 +++++++++++++++++++ 21 files changed, 368 insertions(+), 41 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index d9318b5cffcb..235edae27144 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1177,6 +1177,17 @@ acceptedBreaks: old: "class org.apache.iceberg.Metrics" new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.PartitionField" + new: "class org.apache.iceberg.PartitionField" + justification: "Introduce source-ids" + - code: "java.method.addedToInterface" + new: "method boolean org.apache.iceberg.transforms.Transform::canTransform(java.util.List)" + justification: "Add multi-args support in transforms" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.types.Type org.apache.iceberg.transforms.Transform::getResultType(java.util.List)" + justification: "Add multi-args support in transforms" org.apache.iceberg:iceberg-core: - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ diff --git a/api/src/main/java/org/apache/iceberg/PartitionField.java b/api/src/main/java/org/apache/iceberg/PartitionField.java index 3ed765a89834..044779c1bdae 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionField.java +++ b/api/src/main/java/org/apache/iceberg/PartitionField.java @@ -19,26 +19,40 @@ package org.apache.iceberg; import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.transforms.Transform; /** Represents a single field in a {@link PartitionSpec}. */ public class PartitionField implements Serializable { - private final int sourceId; + private final List sourceIds; private final int fieldId; private final String name; private final Transform transform; PartitionField(int sourceId, int fieldId, String name, Transform transform) { - this.sourceId = sourceId; + this(List.of(sourceId), fieldId, name, transform); + } + + PartitionField(List sourceIds, int fieldId, String name, Transform transform) { + this.sourceIds = sourceIds; this.fieldId = fieldId; this.name = name; this.transform = transform; } - /** Returns the field id of the source field in the {@link PartitionSpec spec's} table schema. */ + /** Returns the source field in the {@link PartitionSpec spec's} table schema. */ public int sourceId() { - return sourceId; + Preconditions.checkArgument( + sourceIds.size() == 1, "Cannot use sourceId() on a transform with multiple arguments"); + return sourceIds.get(0); + } + + /** Returns source ids field in the {@link PartitionSpec spec's} table schema. */ + public List sourceIds() { + return sourceIds; } /** Returns the partition field id across all the table metadata's partition specs. */ @@ -58,7 +72,14 @@ public String name() { @Override public String toString() { - return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")"; + return fieldId + + ": " + + name + + ": " + + transform + + "(" + + sourceIds.stream().map(String::valueOf).collect(Collectors.joining(", ")) + + ")"; } @Override @@ -70,7 +91,7 @@ public boolean equals(Object other) { } PartitionField that = (PartitionField) other; - return sourceId == that.sourceId + return sourceIds.equals(that.sourceIds) && fieldId == that.fieldId && name.equals(that.name) && transform.toString().equals(that.transform.toString()); @@ -78,6 +99,6 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hashCode(sourceId, fieldId, name, transform); + return Objects.hashCode(sourceIds, fieldId, name, transform); } } diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index f059c928a967..9a80324d7ec2 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -106,7 +106,7 @@ public UnboundPartitionSpec toUnbound() { for (PartitionField field : fields) { builder.addField( - field.transform().toString(), field.sourceId(), field.fieldId(), field.name()); + field.transform().toString(), field.sourceIds(), field.fieldId(), field.name()); } return builder.build(); @@ -608,17 +608,26 @@ public Builder alwaysNull(String sourceName) { // add a partition field with an auto-increment partition field id starting from // PARTITION_DATA_ID_START + Builder add(List sourceIds, String name, Transform transform) { + return add(sourceIds, nextFieldId(), name, transform); + } + Builder add(int sourceId, String name, Transform transform) { - return add(sourceId, nextFieldId(), name, transform); + return add(List.of(sourceId), name, transform); } - Builder add(int sourceId, int fieldId, String name, Transform transform) { - checkAndAddPartitionName(name, sourceId); - fields.add(new PartitionField(sourceId, fieldId, name, transform)); + Builder add(List sourceIds, int fieldId, String name, Transform transform) { + // we use the first entry in the source-ids list here + checkAndAddPartitionName(name, sourceIds.get(0)); + fields.add(new PartitionField(sourceIds, fieldId, name, transform)); lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); return this; } + Builder add(int sourceId, int fieldId, String name, Transform transform) { + return add(List.of(sourceId), fieldId, name, transform); + } + public PartitionSpec build() { return build(false); } @@ -641,10 +650,13 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) { static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) { final Map parents = TypeUtil.indexParents(schema.asStruct()); for (PartitionField field : spec.fields) { - Type sourceType = schema.findType(field.sourceId()); + List sourceTypes = Lists.newArrayList(); + for (int sourceId : field.sourceIds()) { + sourceTypes.add(schema.findType(sourceId)); + } Transform transform = field.transform(); // In the case the underlying field is dropped, we cannot check if they are compatible - if (allowMissingFields && sourceType == null) { + if (allowMissingFields) { continue; } // In the case of a Version 1 partition-spec field gets deleted, @@ -654,15 +666,20 @@ static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowM // checks if (!transform.equals(Transforms.alwaysNull())) { ValidationException.check( - sourceType != null, "Cannot find source column for partition field: %s", field); - ValidationException.check( - sourceType.isPrimitiveType(), - "Cannot partition by non-primitive source field: %s", - sourceType); + !sourceTypes.isEmpty(), "Cannot find source column for partition field: %s", field); + for (Type sourceType : sourceTypes) { + ValidationException.check( + sourceType != null, "Cannot find source column for partition field: %s", field); + ValidationException.check( + sourceType.isPrimitiveType(), + "Cannot partition by non-primitive source field: %s (%s)", + field, + sourceType); + } ValidationException.check( - transform.canTransform(sourceType), + transform.canTransform(sourceTypes), "Invalid source type %s for transform: %s", - sourceType, + sourceTypes, transform); // The only valid parent types for a PartitionField are StructTypes. This must be checked // recursively. diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index 30b3cce35fc8..db20e3b6eef1 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; @@ -58,7 +59,7 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) { PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); for (UnboundPartitionField field : fields) { - Type fieldType = schema.findType(field.sourceId); + Type fieldType = schema.findType(field.sourceIds.get(0)); Transform transform; if (fieldType != null) { transform = Transforms.fromString(fieldType, field.transform.toString()); @@ -66,9 +67,9 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) { transform = Transforms.fromString(field.transform.toString()); } if (field.partitionId != null) { - builder.add(field.sourceId, field.partitionId, field.name, transform); + builder.add(field.sourceIds, field.partitionId, field.name, transform); } else { - builder.add(field.sourceId, field.name, transform); + builder.add(field.sourceIds, field.name, transform); } } @@ -92,14 +93,23 @@ Builder withSpecId(int newSpecId) { return this; } + Builder addField( + String transformAsString, List sourceIds, int partitionId, String name) { + fields.add(new UnboundPartitionField(transformAsString, sourceIds, partitionId, name)); + return this; + } + Builder addField(String transformAsString, int sourceId, int partitionId, String name) { - fields.add(new UnboundPartitionField(transformAsString, sourceId, partitionId, name)); + return addField(transformAsString, List.of(sourceId), partitionId, name); + } + + Builder addField(String transformAsString, List sourceIds, String name) { + fields.add(new UnboundPartitionField(transformAsString, sourceIds, null, name)); return this; } Builder addField(String transformAsString, int sourceId, String name) { - fields.add(new UnboundPartitionField(transformAsString, sourceId, null, name)); - return this; + return addField(transformAsString, List.of(sourceId), name); } UnboundPartitionSpec build() { @@ -109,7 +119,7 @@ UnboundPartitionSpec build() { static class UnboundPartitionField { private final Transform transform; - private final int sourceId; + private final List sourceIds; private final Integer partitionId; private final String name; @@ -122,7 +132,12 @@ public String transformAsString() { } public int sourceId() { - return sourceId; + Preconditions.checkArgument(sourceIds.size() >= 1, "At least one source is expected"); + return sourceIds.get(0); + } + + public List sourceIds() { + return sourceIds; } public Integer partitionId() { @@ -134,9 +149,9 @@ public String name() { } private UnboundPartitionField( - String transformAsString, int sourceId, Integer partitionId, String name) { + String transformAsString, List sourceIds, Integer partitionId, String name) { this.transform = Transforms.fromString(transformAsString); - this.sourceId = sourceId; + this.sourceIds = sourceIds; this.partitionId = partitionId; this.name = name; } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java index 2b2439e3ed0a..cb781aeba602 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; import java.util.function.Function; import org.apache.iceberg.expressions.BoundPredicate; @@ -135,6 +136,12 @@ public boolean canTransform(Type type) { return false; } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one type is accepted"); + return canTransform(types.get(0)); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -206,6 +213,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accpeted"); + return getResultType(sourceTypes.get(0)); + } + private static class BucketInteger extends Bucket implements SerializableFunction { diff --git a/api/src/main/java/org/apache/iceberg/transforms/Dates.java b/api/src/main/java/org/apache/iceberg/transforms/Dates.java index 841e6dfa3a51..de5c3e5fa1f4 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Dates.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Dates.java @@ -20,6 +20,7 @@ import com.google.errorprone.annotations.Immutable; import java.time.temporal.ChronoUnit; +import java.util.List; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundTransform; import org.apache.iceberg.expressions.Expression; @@ -97,6 +98,12 @@ public boolean canTransform(Type type) { return type.typeId() == Type.TypeID.DATE; } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one type is accepted"); + return canTransform(types.get(0)); + } + @Override public Type getResultType(Type sourceType) { if (granularity == ChronoUnit.DAYS) { @@ -105,6 +112,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + ChronoUnit granularity() { return granularity; } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Days.java b/api/src/main/java/org/apache/iceberg/transforms/Days.java index e2b829b86662..cdd517b4b8a7 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Days.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Days.java @@ -20,6 +20,8 @@ import java.io.ObjectStreamException; import java.time.temporal.ChronoUnit; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -47,6 +49,12 @@ public Type getResultType(Type sourceType) { return Types.DateType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public String toHumanString(Type alwaysDate, Integer value) { return value != null ? TransformUtil.humanDay(value) : "null"; diff --git a/api/src/main/java/org/apache/iceberg/transforms/Hours.java b/api/src/main/java/org/apache/iceberg/transforms/Hours.java index 2ff79f6a66a7..7af296c255f8 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Hours.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Hours.java @@ -20,6 +20,8 @@ import java.io.ObjectStreamException; import java.time.temporal.ChronoUnit; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -53,6 +55,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public String toHumanString(Type alwaysInt, Integer value) { return value != null ? TransformUtil.humanHour(value) : "null"; diff --git a/api/src/main/java/org/apache/iceberg/transforms/Identity.java b/api/src/main/java/org/apache/iceberg/transforms/Identity.java index f52e63633578..dcf8e59ad349 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Identity.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Identity.java @@ -19,6 +19,7 @@ package org.apache.iceberg.transforms; import java.io.ObjectStreamException; +import java.util.List; import java.util.Set; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.Expressions; @@ -105,6 +106,12 @@ public boolean canTransform(Type maybePrimitive) { return maybePrimitive.isPrimitiveType(); } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one type is accepted"); + return canTransform(types.get(0)); + } + /** * Returns a human-readable String representation of a transformed value. * @@ -128,6 +135,12 @@ public Type getResultType(Type sourceType) { return sourceType; } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one sorce type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public boolean preservesOrder() { return true; diff --git a/api/src/main/java/org/apache/iceberg/transforms/Months.java b/api/src/main/java/org/apache/iceberg/transforms/Months.java index 73ec50e5dd9a..d8103414cd2c 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Months.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Months.java @@ -20,6 +20,8 @@ import java.io.ObjectStreamException; import java.time.temporal.ChronoUnit; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -47,6 +49,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public String toHumanString(Type alwaysInt, Integer value) { return value != null ? TransformUtil.humanMonth(value) : "null"; diff --git a/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java b/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java index c348fda52b02..49f1c6641788 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java +++ b/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java @@ -19,9 +19,11 @@ package org.apache.iceberg.transforms; import java.time.temporal.ChronoUnit; +import java.util.List; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundTransform; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.util.SerializableFunction; @@ -81,6 +83,12 @@ public boolean canTransform(Type type) { || type.typeId() == Type.TypeID.TIMESTAMP_NANO; } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one source type is accepted"); + return canTransform(types.get(0)); + } + @Override public UnboundPredicate project(String name, BoundPredicate predicate) { if (predicate.term() instanceof BoundTransform) { diff --git a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java index 845725219438..274f26146fab 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java @@ -20,6 +20,7 @@ import com.google.errorprone.annotations.Immutable; import java.time.temporal.ChronoUnit; +import java.util.List; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundTransform; import org.apache.iceberg.expressions.Expression; @@ -130,6 +131,12 @@ public boolean canTransform(Type type) { return type.typeId() == Type.TypeID.TIMESTAMP || type.typeId() == Type.TypeID.TIMESTAMP_NANO; } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one type is accepted"); + return canTransform(types.get(0)); + } + @Override public Type getResultType(Type sourceType) { if (granularity == ChronoUnit.DAYS) { @@ -138,6 +145,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + ChronoUnit granularity() { return granularity; } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Transform.java b/api/src/main/java/org/apache/iceberg/transforms/Transform.java index 78312b58b12f..6e2b8c904e23 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Transform.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Transform.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.List; import java.util.function.Function; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.UnboundPredicate; @@ -68,6 +69,14 @@ default SerializableFunction bind(Type type) { */ boolean canTransform(Type type); + /** + * Checks whether this function can be applied to the given list of {@link Type}. + * + * @param types the list of types + * @return true if this transform can be applied to the types, false otherwise + */ + boolean canTransform(List types); + /** * Returns the {@link Type} produced by this transform given a source type. * @@ -76,6 +85,14 @@ default SerializableFunction bind(Type type) { */ Type getResultType(Type sourceType); + /** + * Returns the {@link Type} produced by this transform given a source types. + * + * @param sourceTypes a list of type + * @return the result type created by the apply method for the given types + */ + Type getResultType(List sourceTypes); + /** * Whether the transform preserves the order of values (is monotonic). * diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java index a111e4ca394b..c8bebf75628e 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.List; import java.util.function.Function; import org.apache.iceberg.expressions.BoundLiteralPredicate; import org.apache.iceberg.expressions.BoundPredicate; @@ -103,6 +104,12 @@ public boolean canTransform(Type type) { return false; } + @Override + public boolean canTransform(List types) { + Preconditions.checkArgument(types.size() == 1, "Only one type is accepted"); + return canTransform(types.get(0)); + } + @Override public UnboundPredicate project(String name, BoundPredicate predicate) { Truncate bound = (Truncate) get(predicate.term().type(), width); @@ -120,6 +127,12 @@ public Type getResultType(Type sourceType) { return sourceType; } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public boolean preservesOrder() { return true; diff --git a/api/src/main/java/org/apache/iceberg/transforms/UnknownTransform.java b/api/src/main/java/org/apache/iceberg/transforms/UnknownTransform.java index aebd3445e36e..de2c85b08db6 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/UnknownTransform.java +++ b/api/src/main/java/org/apache/iceberg/transforms/UnknownTransform.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.transforms; +import java.util.List; import java.util.Objects; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.UnboundPredicate; @@ -60,12 +61,24 @@ public boolean canTransform(Type type) { return true; } + @Override + public boolean canTransform(List types) { + // assume the trasnform function can be applied for any type + return true; + } + @Override public Type getResultType(Type type) { // the actual result type is not known return Types.StringType.get(); } + @Override + public Type getResultType(List types) { + // the actual result type is not known + return Types.StringType.get(); + } + @Override public UnboundPredicate project(String name, BoundPredicate predicate) { return null; diff --git a/api/src/main/java/org/apache/iceberg/transforms/VoidTransform.java b/api/src/main/java/org/apache/iceberg/transforms/VoidTransform.java index b46780244faf..a0f5b7f9c1db 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/VoidTransform.java +++ b/api/src/main/java/org/apache/iceberg/transforms/VoidTransform.java @@ -20,6 +20,7 @@ import java.io.ObjectStreamException; import java.io.Serializable; +import java.util.List; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.types.Type; @@ -72,11 +73,24 @@ public boolean canTransform(Type type) { return true; } + @Override + public boolean canTransform(List types) { + return true; + } + @Override public Type getResultType(Type sourceType) { return sourceType; } + @Override + public Type getResultType(List sourceTypes) { + if (!sourceTypes.isEmpty()) { + return sourceTypes.get(0); + } + return null; + } + @Override public UnboundPredicate projectStrict(String name, BoundPredicate predicate) { return null; diff --git a/api/src/main/java/org/apache/iceberg/transforms/Years.java b/api/src/main/java/org/apache/iceberg/transforms/Years.java index 2920a37dc692..159d1e17eeb5 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Years.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Years.java @@ -20,6 +20,8 @@ import java.io.ObjectStreamException; import java.time.temporal.ChronoUnit; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -47,6 +49,12 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public Type getResultType(List sourceTypes) { + Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted"); + return getResultType(sourceTypes.get(0)); + } + @Override public String toHumanString(Type alwaysInt, Integer value) { return value != null ? TransformUtil.humanYear(value) : "null"; diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index a21e3752e84d..7a283f550eac 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -768,12 +768,12 @@ public ExpectedSpecBuilder withSpecId(int newSpecId) { public ExpectedSpecBuilder addField( String transformAsString, int sourceId, int partitionId, String name) { - unboundPartitionSpecBuilder.addField(transformAsString, sourceId, partitionId, name); + unboundPartitionSpecBuilder.addField(transformAsString, List.of(sourceId), partitionId, name); return this; } public ExpectedSpecBuilder addField(String transformAsString, int sourceId, String name) { - unboundPartitionSpecBuilder.addField(transformAsString, sourceId, name); + unboundPartitionSpecBuilder.addField(transformAsString, List.of(sourceId), name); return this; } diff --git a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java index b8e16a9ee45e..61481f4667ef 100644 --- a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java +++ b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java @@ -334,10 +334,14 @@ public void testUnsupported(int fieldId, String partitionName, String expectedEr private static Object[][] unsupportedFieldsProvider() { return new Object[][] { - {7, "variant_partition1", "Cannot partition by non-primitive source field: variant"}, - {8, "geom_partition1", "Invalid source type geometry for transform: bucket[5]"}, - {9, "geog_partition1", "Invalid source type geography for transform: bucket[5]"}, - {10, "unknown_partition1", "Invalid source type unknown for transform: bucket[5]"} + { + 7, + "variant_partition1", + "Cannot partition by non-primitive source field: 1005: variant_partition1: bucket[5](7) (variant)" + }, + {8, "geom_partition1", "Invalid source type [geometry] for transform: bucket[5]"}, + {9, "geog_partition1", "Invalid source type [geography] for transform: bucket[5]"}, + {10, "unknown_partition1", "Invalid source type [unknown] for transform: bucket[5]"} }; } diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java index 7becf0c62943..f09e5b30fadf 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java +++ b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java @@ -24,7 +24,9 @@ import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.util.Iterator; +import java.util.List; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.util.Pair; @@ -35,6 +37,7 @@ private PartitionSpecParser() {} private static final String SPEC_ID = "spec-id"; private static final String FIELDS = "fields"; private static final String SOURCE_ID = "source-id"; + private static final String SOURCE_IDS = "source-ids"; private static final String FIELD_ID = "field-id"; private static final String TRANSFORM = "transform"; private static final String NAME = "name"; @@ -98,10 +101,22 @@ static void toJsonFields(UnboundPartitionSpec spec, JsonGenerator generator) thr generator.writeStartObject(); generator.writeStringField(NAME, field.name()); generator.writeStringField(TRANSFORM, field.transformAsString()); - generator.writeNumberField(SOURCE_ID, field.sourceId()); + if (field.sourceIds().size() == 1) { + generator.writeNumberField(SOURCE_ID, field.sourceId()); + } else { + generator.writeFieldName(SOURCE_IDS); + generator.writeStartArray(); + for (Integer value : field.sourceIds()) { + generator.writeNumber(value); + } + + generator.writeEndArray(); + } + generator.writeNumberField(FIELD_ID, field.partitionId()); generator.writeEndObject(); } + generator.writeEndArray(); } @@ -132,15 +147,36 @@ private static void buildFromJsonFields(UnboundPartitionSpec.Builder builder, Js String name = JsonUtil.getString(NAME, element); String transform = JsonUtil.getString(TRANSFORM, element); - int sourceId = JsonUtil.getInt(SOURCE_ID, element); + Preconditions.checkArgument( + !(element.has(SOURCE_ID) && element.has(SOURCE_IDS)), + "Cannot parse partition field, only " + + SOURCE_ID + + " or " + + SOURCE_IDS + + " are accepted exclusively, not both"); + Preconditions.checkArgument( + element.has(SOURCE_ID) || element.has(SOURCE_IDS), + "Cannot parse partition field, either " + + SOURCE_ID + + " or " + + SOURCE_IDS + + " has to be present"); + List sourceIds; + Integer sourceId = JsonUtil.getIntOrNull(SOURCE_ID, element); + if (sourceId != null) { + sourceIds = Lists.newArrayList(); + sourceIds.add(sourceId); + } else { + sourceIds = JsonUtil.getIntegerList(SOURCE_IDS, element); + } // partition field ids are missing in old PartitionSpec, they always auto-increment from // PARTITION_DATA_ID_START if (element.has(FIELD_ID)) { - builder.addField(transform, sourceId, JsonUtil.getInt(FIELD_ID, element), name); + builder.addField(transform, sourceIds, JsonUtil.getInt(FIELD_ID, element), name); fieldIdCount++; } else { - builder.addField(transform, sourceId, name); + builder.addField(transform, sourceIds, name); } } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java index e97d2f98b416..184e4f67fde1 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Arrays; import java.util.List; @@ -95,6 +96,79 @@ public void testFromJsonWithFieldId() { assertThat(spec.fields().get(1).fieldId()).isEqualTo(1000); } + @TestTemplate + public void testFromJsonWithSourceIds() { + String specString = + "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"void\",\n" + + " \"transform\" : \"void\",\n" + + " \"source-ids\" : [ 1, 2 ],\n" + + " \"field-id\" : 1001\n" + + " }]\n" + + "}"; + + PartitionSpec spec = PartitionSpecParser.fromJson(table.schema(), specString); + + assertThat(spec.fields()).hasSize(1); + assertThat(spec.fields().get(0).sourceIds()).hasSize(2); + } + + @TestTemplate + public void testFromJsonWithBothSourceIdAndSourceIds() { + String specString = + "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"id_bucket\",\n" + + " \"transform\" : \"bucket[8]\",\n" + + " \"source-id\" : 1,\n" + + " \"source-ids\" : [ 1 ],\n" + + " \"field-id\" : 1001\n" + + " }, {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"source-id\" : 2,\n" + + " \"source-ids\" : [ 2 ],\n" + + " \"field-id\" : 1000\n" + + " } ]\n" + + "}"; + + assertThatThrownBy( + () -> { + PartitionSpecParser.fromJson(table.schema(), specString); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot parse partition field, only source-id or source-ids are accepted exclusively, not both"); + } + + @TestTemplate + public void testFromJsonWithoutSourceIdAndSourceIds() { + String specString = + "{\n" + + " \"spec-id\" : 1,\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"id_bucket\",\n" + + " \"transform\" : \"bucket[8]\",\n" + + " \"field-id\" : 1001\n" + + " }, {\n" + + " \"name\" : \"data_bucket\",\n" + + " \"transform\" : \"bucket[16]\",\n" + + " \"field-id\" : 1000\n" + + " } ]\n" + + "}"; + + assertThatThrownBy( + () -> { + PartitionSpecParser.fromJson(table.schema(), specString); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot parse partition field, either source-id or source-ids has to be present"); + } + @TestTemplate public void testFromJsonWithoutFieldId() { String specString = From 8279e9fa3282ec785d6cd8013a722f9d16c943d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Mon, 22 Sep 2025 17:18:04 +0200 Subject: [PATCH 2/3] Add new line after blocks to be consistent with recommanded style --- api/src/main/java/org/apache/iceberg/PartitionSpec.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 9a80324d7ec2..d5fb34cdbf10 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -654,11 +654,13 @@ static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowM for (int sourceId : field.sourceIds()) { sourceTypes.add(schema.findType(sourceId)); } + Transform transform = field.transform(); // In the case the underlying field is dropped, we cannot check if they are compatible - if (allowMissingFields) { + if (allowMissingFields && sourceTypes.isEmpty()) { continue; } + // In the case of a Version 1 partition-spec field gets deleted, // it is replaced with a void transform, see: // https://iceberg.apache.org/spec/#partition-transforms @@ -676,6 +678,7 @@ static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowM field, sourceType); } + ValidationException.check( transform.canTransform(sourceTypes), "Invalid source type %s for transform: %s", @@ -700,6 +703,7 @@ static boolean hasSequentialIds(PartitionSpec spec) { return false; } } + return true; } } From c3ce9fced868d997f96b31da7c4fc0d5f41e289a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Mon, 22 Sep 2025 17:23:28 +0200 Subject: [PATCH 3/3] Add checkNotNull precondition when using sourceIds --- api/src/main/java/org/apache/iceberg/PartitionSpec.java | 4 +++- .../main/java/org/apache/iceberg/UnboundPartitionSpec.java | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index d5fb34cdbf10..86a6659c3939 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -617,6 +617,8 @@ Builder add(int sourceId, String name, Transform transform) { } Builder add(List sourceIds, int fieldId, String name, Transform transform) { + Preconditions.checkNotNull(sourceIds, "sourceIds is required"); + Preconditions.checkNotNull(sourceIds.get(0), "sourceIds needs at least one field"); // we use the first entry in the source-ids list here checkAndAddPartitionName(name, sourceIds.get(0)); fields.add(new PartitionField(sourceIds, fieldId, name, transform)); @@ -703,7 +705,7 @@ static boolean hasSequentialIds(PartitionSpec spec) { return false; } } - + return true; } } diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index db20e3b6eef1..a4ca04f1161e 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -95,6 +95,7 @@ Builder withSpecId(int newSpecId) { Builder addField( String transformAsString, List sourceIds, int partitionId, String name) { + Preconditions.checkNotNull(sourceIds, "sourceIds is required"); fields.add(new UnboundPartitionField(transformAsString, sourceIds, partitionId, name)); return this; } @@ -104,6 +105,7 @@ Builder addField(String transformAsString, int sourceId, int partitionId, String } Builder addField(String transformAsString, List sourceIds, String name) { + Preconditions.checkNotNull(sourceIds, "sourceIds is required"); fields.add(new UnboundPartitionField(transformAsString, sourceIds, null, name)); return this; }