Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,17 @@ acceptedBreaks:
old: "class org.apache.iceberg.Metrics"
new: "class org.apache.iceberg.Metrics"
justification: "Java serialization across versions is not guaranteed"
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.PartitionField"
new: "class org.apache.iceberg.PartitionField"
justification: "Introduce source-ids"
- code: "java.method.addedToInterface"
new: "method boolean org.apache.iceberg.transforms.Transform<S, T>::canTransform(java.util.List<org.apache.iceberg.types.Type>)"
justification: "Add multi-args support in transforms"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.types.Type org.apache.iceberg.transforms.Transform<S,\
\ T>::getResultType(java.util.List<org.apache.iceberg.types.Type>)"
justification: "Add multi-args support in transforms"
org.apache.iceberg:iceberg-core:
- code: "java.method.removed"
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\
Expand Down
35 changes: 28 additions & 7 deletions api/src/main/java/org/apache/iceberg/PartitionField.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,40 @@
package org.apache.iceberg;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transform;

/** Represents a single field in a {@link PartitionSpec}. */
public class PartitionField implements Serializable {
private final int sourceId;
private final List<Integer> sourceIds;
private final int fieldId;
private final String name;
private final Transform<?, ?> transform;

PartitionField(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
this.sourceId = sourceId;
this(List.of(sourceId), fieldId, name, transform);
}

PartitionField(List<Integer> sourceIds, int fieldId, String name, Transform<?, ?> transform) {
this.sourceIds = sourceIds;
this.fieldId = fieldId;
this.name = name;
this.transform = transform;
}

/** Returns the field id of the source field in the {@link PartitionSpec spec's} table schema. */
/** Returns the source field in the {@link PartitionSpec spec's} table schema. */
public int sourceId() {
return sourceId;
Preconditions.checkArgument(
sourceIds.size() == 1, "Cannot use sourceId() on a transform with multiple arguments");
return sourceIds.get(0);
}

/** Returns source ids field in the {@link PartitionSpec spec's} table schema. */
public List<Integer> sourceIds() {
return sourceIds;
}

/** Returns the partition field id across all the table metadata's partition specs. */
Expand All @@ -58,7 +72,14 @@ public String name() {

@Override
public String toString() {
return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")";
return fieldId
+ ": "
+ name
+ ": "
+ transform
+ "("
+ sourceIds.stream().map(String::valueOf).collect(Collectors.joining(", "))
+ ")";
}

@Override
Expand All @@ -70,14 +91,14 @@ public boolean equals(Object other) {
}

PartitionField that = (PartitionField) other;
return sourceId == that.sourceId
return sourceIds.equals(that.sourceIds)
&& fieldId == that.fieldId
&& name.equals(that.name)
&& transform.toString().equals(that.transform.toString());
}

@Override
public int hashCode() {
return Objects.hashCode(sourceId, fieldId, name, transform);
return Objects.hashCode(sourceIds, fieldId, name, transform);
}
}
51 changes: 37 additions & 14 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public UnboundPartitionSpec toUnbound() {

for (PartitionField field : fields) {
builder.addField(
field.transform().toString(), field.sourceId(), field.fieldId(), field.name());
field.transform().toString(), field.sourceIds(), field.fieldId(), field.name());
}

return builder.build();
Expand Down Expand Up @@ -608,17 +608,28 @@ public Builder alwaysNull(String sourceName) {

// add a partition field with an auto-increment partition field id starting from
// PARTITION_DATA_ID_START
Builder add(List<Integer> sourceIds, String name, Transform<?, ?> transform) {
return add(sourceIds, nextFieldId(), name, transform);
}

Builder add(int sourceId, String name, Transform<?, ?> transform) {
return add(sourceId, nextFieldId(), name, transform);
return add(List.of(sourceId), name, transform);
}

Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
checkAndAddPartitionName(name, sourceId);
fields.add(new PartitionField(sourceId, fieldId, name, transform));
Builder add(List<Integer> sourceIds, int fieldId, String name, Transform<?, ?> transform) {
Preconditions.checkNotNull(sourceIds, "sourceIds is required");
Preconditions.checkNotNull(sourceIds.get(0), "sourceIds needs at least one field");
// we use the first entry in the source-ids list here
checkAndAddPartitionName(name, sourceIds.get(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work for multi arg partition field?
Do we need a logic that accepts the sourceIds and resolve the name of multi arg transform?

Copy link
Member Author

@jbonofre jbonofre Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sourceId here is to resolve conflict (no impact on name):

        if (sourceColumnId != null) {
          // for identity transform case we allow conflicts between partition and schema field name
          // as
          //   long as they are sourced from the same schema field
          Preconditions.checkArgument(
              schemaField == null || schemaField.fieldId() == sourceColumnId,
              "Cannot create identity partition sourced from different field in schema: %s",
              name);

Let me check if we should compare fieldId with each column id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic looks like

if (check conflicts). {
  if identity {
    make sure we aren't using a different field name for this identity transform
  } else {
   make sure we aren't matching any other column name
  }
}
Make sure it's not empty
Make sure we haven't already used this name for another partition
Add partition

I have no idea why those last 2 checks aren't part of the "if check conflicts" branch

Anyyyyyyway. I think this whole validation probably should be rewritten. The first branch we are checking basically based a lot of implicit assumptions when we should just be passing in the transform. But we don't have to do any of that now.

For now I think we should pass in sourceIds and just have the first branch include a "if sourceIds.length == 1"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me refactore this part.

fields.add(new PartitionField(sourceIds, fieldId, name, transform));
lastAssignedFieldId.getAndAccumulate(fieldId, Math::max);
return this;
}

Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
return add(List.of(sourceId), fieldId, name, transform);
}

public PartitionSpec build() {
return build(false);
}
Expand All @@ -641,28 +652,39 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) {
static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) {
final Map<Integer, Integer> parents = TypeUtil.indexParents(schema.asStruct());
for (PartitionField field : spec.fields) {
Type sourceType = schema.findType(field.sourceId());
List<Type> sourceTypes = Lists.newArrayList();
for (int sourceId : field.sourceIds()) {
sourceTypes.add(schema.findType(sourceId));
}

Transform<?, ?> transform = field.transform();
// In the case the underlying field is dropped, we cannot check if they are compatible
if (allowMissingFields && sourceType == null) {
if (allowMissingFields && sourceTypes.isEmpty()) {
continue;
}

// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
!sourceTypes.isEmpty(), "Cannot find source column for partition field: %s", field);
for (Type sourceType : sourceTypes) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s (%s)",
field,
sourceType);
}

ValidationException.check(
transform.canTransform(sourceType),
transform.canTransform(sourceTypes),
"Invalid source type %s for transform: %s",
sourceType,
sourceTypes,
transform);
// The only valid parent types for a PartitionField are StructTypes. This must be checked
// recursively.
Expand All @@ -683,6 +705,7 @@ static boolean hasSequentialIds(PartitionSpec spec) {
return false;
}
}

return true;
}
}
37 changes: 27 additions & 10 deletions api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
Expand Down Expand Up @@ -58,17 +59,17 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);

for (UnboundPartitionField field : fields) {
Type fieldType = schema.findType(field.sourceId);
Type fieldType = schema.findType(field.sourceIds.get(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we don't have a multi-arg transform to resolve here yet but this doesn't seem like the right thing to do. I think Transforms.fromString needs to be modified to accept fromString(list[types], transformName)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree. Let me update that.

Transform<?, ?> transform;
if (fieldType != null) {
transform = Transforms.fromString(fieldType, field.transform.toString());
} else {
transform = Transforms.fromString(field.transform.toString());
}
if (field.partitionId != null) {
builder.add(field.sourceId, field.partitionId, field.name, transform);
builder.add(field.sourceIds, field.partitionId, field.name, transform);
} else {
builder.add(field.sourceId, field.name, transform);
builder.add(field.sourceIds, field.name, transform);
}
}

Expand All @@ -92,14 +93,25 @@ Builder withSpecId(int newSpecId) {
return this;
}

Builder addField(
String transformAsString, List<Integer> sourceIds, int partitionId, String name) {
Preconditions.checkNotNull(sourceIds, "sourceIds is required");
fields.add(new UnboundPartitionField(transformAsString, sourceIds, partitionId, name));
return this;
}

Builder addField(String transformAsString, int sourceId, int partitionId, String name) {
fields.add(new UnboundPartitionField(transformAsString, sourceId, partitionId, name));
return addField(transformAsString, List.of(sourceId), partitionId, name);
}

Builder addField(String transformAsString, List<Integer> sourceIds, String name) {
Preconditions.checkNotNull(sourceIds, "sourceIds is required");
fields.add(new UnboundPartitionField(transformAsString, sourceIds, null, name));
return this;
}

Builder addField(String transformAsString, int sourceId, String name) {
fields.add(new UnboundPartitionField(transformAsString, sourceId, null, name));
return this;
return addField(transformAsString, List.of(sourceId), name);
}

UnboundPartitionSpec build() {
Expand All @@ -109,7 +121,7 @@ UnboundPartitionSpec build() {

static class UnboundPartitionField {
private final Transform<?, ?> transform;
private final int sourceId;
private final List<Integer> sourceIds;
private final Integer partitionId;
private final String name;

Expand All @@ -122,7 +134,12 @@ public String transformAsString() {
}

public int sourceId() {
return sourceId;
Preconditions.checkArgument(sourceIds.size() >= 1, "At least one source is expected");
return sourceIds.get(0);
}

public List<Integer> sourceIds() {
return sourceIds;
}

public Integer partitionId() {
Expand All @@ -134,9 +151,9 @@ public String name() {
}

private UnboundPartitionField(
String transformAsString, int sourceId, Integer partitionId, String name) {
String transformAsString, List<Integer> sourceIds, Integer partitionId, String name) {
this.transform = Transforms.fromString(transformAsString);
this.sourceId = sourceId;
this.sourceIds = sourceIds;
this.partitionId = partitionId;
this.name = name;
}
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/transforms/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import org.apache.iceberg.expressions.BoundPredicate;
Expand Down Expand Up @@ -135,6 +136,12 @@ public boolean canTransform(Type type) {
return false;
}

@Override
public boolean canTransform(List<Type> types) {
Preconditions.checkArgument(types.size() == 1, "Only one type is accepted");
return canTransform(types.get(0));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -206,6 +213,12 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

@Override
public Type getResultType(List<Type> sourceTypes) {
Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accpeted");
return getResultType(sourceTypes.get(0));
}

private static class BucketInteger extends Bucket<Integer>
implements SerializableFunction<Integer, Integer> {

Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/transforms/Dates.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.errorprone.annotations.Immutable;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundTransform;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -97,6 +98,12 @@ public boolean canTransform(Type type) {
return type.typeId() == Type.TypeID.DATE;
}

@Override
public boolean canTransform(List<Type> types) {
Preconditions.checkArgument(types.size() == 1, "Only one type is accepted");
return canTransform(types.get(0));
}

@Override
public Type getResultType(Type sourceType) {
if (granularity == ChronoUnit.DAYS) {
Expand All @@ -105,6 +112,12 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

@Override
public Type getResultType(List<Type> sourceTypes) {
Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted");
return getResultType(sourceTypes.get(0));
}

ChronoUnit granularity() {
return granularity;
}
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/transforms/Days.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.ObjectStreamException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand Down Expand Up @@ -47,6 +49,12 @@ public Type getResultType(Type sourceType) {
return Types.DateType.get();
}

@Override
public Type getResultType(List<Type> sourceTypes) {
Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted");
return getResultType(sourceTypes.get(0));
}

@Override
public String toHumanString(Type alwaysDate, Integer value) {
return value != null ? TransformUtil.humanDay(value) : "null";
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/transforms/Hours.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.ObjectStreamException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand Down Expand Up @@ -53,6 +55,12 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}

@Override
public Type getResultType(List<Type> sourceTypes) {
Preconditions.checkArgument(sourceTypes.size() == 1, "Only one source type is accepted");
return getResultType(sourceTypes.get(0));
}

@Override
public String toHumanString(Type alwaysInt, Integer value) {
return value != null ? TransformUtil.humanHour(value) : "null";
Expand Down
Loading
Loading