-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Implement source-ids to deal with multi arguments transforms #12897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,7 +106,7 @@ public UnboundPartitionSpec toUnbound() { | |
|
|
||
| for (PartitionField field : fields) { | ||
| builder.addField( | ||
| field.transform().toString(), field.sourceId(), field.fieldId(), field.name()); | ||
| field.transform().toString(), field.sourceIds(), field.fieldId(), field.name()); | ||
| } | ||
|
|
||
| return builder.build(); | ||
|
|
@@ -608,17 +608,28 @@ public Builder alwaysNull(String sourceName) { | |
|
|
||
| // add a partition field with an auto-increment partition field id starting from | ||
| // PARTITION_DATA_ID_START | ||
| Builder add(List<Integer> sourceIds, String name, Transform<?, ?> transform) { | ||
| return add(sourceIds, nextFieldId(), name, transform); | ||
| } | ||
|
|
||
| Builder add(int sourceId, String name, Transform<?, ?> transform) { | ||
| return add(sourceId, nextFieldId(), name, transform); | ||
| return add(List.of(sourceId), name, transform); | ||
| } | ||
|
|
||
| Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) { | ||
| checkAndAddPartitionName(name, sourceId); | ||
| fields.add(new PartitionField(sourceId, fieldId, name, transform)); | ||
| Builder add(List<Integer> sourceIds, int fieldId, String name, Transform<?, ?> transform) { | ||
jbonofre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Preconditions.checkNotNull(sourceIds, "sourceIds is required"); | ||
| Preconditions.checkNotNull(sourceIds.get(0), "sourceIds needs at least one field"); | ||
| // we use the first entry in the source-ids list here | ||
| checkAndAddPartitionName(name, sourceIds.get(0)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this work for multi arg partition field?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sourceId here is to resolve conflict (no impact on name): Let me check if we should compare
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current logic looks like if (check conflicts). {
if identity {
make sure we aren't using a different field name for this identity transform
} else {
make sure we aren't matching any other column name
}
}
Make sure it's not empty
Make sure we haven't already used this name for another partition
Add partitionI have no idea why those last 2 checks aren't part of the "if check conflicts" branch Anyyyyyyway. I think this whole validation probably should be rewritten. The first branch we are checking basically based a lot of implicit assumptions when we should just be passing in the transform. But we don't have to do any of that now. For now I think we should pass in sourceIds and just have the first branch include a "if sourceIds.length == 1"
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, let me refactore this part. |
||
| fields.add(new PartitionField(sourceIds, fieldId, name, transform)); | ||
| lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); | ||
| return this; | ||
| } | ||
|
|
||
| Builder add(int sourceId, int fieldId, String name, Transform<?, ?> transform) { | ||
| return add(List.of(sourceId), fieldId, name, transform); | ||
| } | ||
|
|
||
| public PartitionSpec build() { | ||
| return build(false); | ||
| } | ||
|
|
@@ -641,28 +652,39 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) { | |
| static void checkCompatibility(PartitionSpec spec, Schema schema, boolean allowMissingFields) { | ||
| final Map<Integer, Integer> parents = TypeUtil.indexParents(schema.asStruct()); | ||
| for (PartitionField field : spec.fields) { | ||
| Type sourceType = schema.findType(field.sourceId()); | ||
| List<Type> sourceTypes = Lists.newArrayList(); | ||
| for (int sourceId : field.sourceIds()) { | ||
| sourceTypes.add(schema.findType(sourceId)); | ||
| } | ||
jbonofre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Transform<?, ?> transform = field.transform(); | ||
| // In the case the underlying field is dropped, we cannot check if they are compatible | ||
| if (allowMissingFields && sourceType == null) { | ||
| if (allowMissingFields && sourceTypes.isEmpty()) { | ||
| continue; | ||
| } | ||
|
|
||
| // In the case of a Version 1 partition-spec field gets deleted, | ||
| // it is replaced with a void transform, see: | ||
| // https://iceberg.apache.org/spec/#partition-transforms | ||
| // We don't care about the source type since a VoidTransform is always compatible and skip the | ||
| // checks | ||
| if (!transform.equals(Transforms.alwaysNull())) { | ||
| ValidationException.check( | ||
| sourceType != null, "Cannot find source column for partition field: %s", field); | ||
| ValidationException.check( | ||
| sourceType.isPrimitiveType(), | ||
| "Cannot partition by non-primitive source field: %s", | ||
| sourceType); | ||
| !sourceTypes.isEmpty(), "Cannot find source column for partition field: %s", field); | ||
| for (Type sourceType : sourceTypes) { | ||
| ValidationException.check( | ||
| sourceType != null, "Cannot find source column for partition field: %s", field); | ||
| ValidationException.check( | ||
| sourceType.isPrimitiveType(), | ||
| "Cannot partition by non-primitive source field: %s (%s)", | ||
| field, | ||
| sourceType); | ||
| } | ||
jbonofre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ValidationException.check( | ||
| transform.canTransform(sourceType), | ||
| transform.canTransform(sourceTypes), | ||
| "Invalid source type %s for transform: %s", | ||
| sourceType, | ||
| sourceTypes, | ||
| transform); | ||
| // The only valid parent types for a PartitionField are StructTypes. This must be checked | ||
| // recursively. | ||
|
|
@@ -683,6 +705,7 @@ static boolean hasSequentialIds(PartitionSpec spec) { | |
| return false; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| package org.apache.iceberg; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.transforms.Transform; | ||
| import org.apache.iceberg.transforms.Transforms; | ||
|
|
@@ -58,17 +59,17 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) { | |
| PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); | ||
|
|
||
| for (UnboundPartitionField field : fields) { | ||
| Type fieldType = schema.findType(field.sourceId); | ||
| Type fieldType = schema.findType(field.sourceIds.get(0)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we don't have a multi-arg transform to resolve here yet but this doesn't seem like the right thing to do. I think Transforms.fromString needs to be modified to accept fromString(list[types], transformName)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, agree. Let me update that. |
||
| Transform<?, ?> transform; | ||
| if (fieldType != null) { | ||
| transform = Transforms.fromString(fieldType, field.transform.toString()); | ||
| } else { | ||
| transform = Transforms.fromString(field.transform.toString()); | ||
| } | ||
| if (field.partitionId != null) { | ||
| builder.add(field.sourceId, field.partitionId, field.name, transform); | ||
| builder.add(field.sourceIds, field.partitionId, field.name, transform); | ||
| } else { | ||
| builder.add(field.sourceId, field.name, transform); | ||
| builder.add(field.sourceIds, field.name, transform); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -92,14 +93,25 @@ Builder withSpecId(int newSpecId) { | |
| return this; | ||
| } | ||
|
|
||
| Builder addField( | ||
| String transformAsString, List<Integer> sourceIds, int partitionId, String name) { | ||
jbonofre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Preconditions.checkNotNull(sourceIds, "sourceIds is required"); | ||
| fields.add(new UnboundPartitionField(transformAsString, sourceIds, partitionId, name)); | ||
| return this; | ||
| } | ||
|
|
||
| Builder addField(String transformAsString, int sourceId, int partitionId, String name) { | ||
| fields.add(new UnboundPartitionField(transformAsString, sourceId, partitionId, name)); | ||
| return addField(transformAsString, List.of(sourceId), partitionId, name); | ||
| } | ||
|
|
||
| Builder addField(String transformAsString, List<Integer> sourceIds, String name) { | ||
| Preconditions.checkNotNull(sourceIds, "sourceIds is required"); | ||
| fields.add(new UnboundPartitionField(transformAsString, sourceIds, null, name)); | ||
| return this; | ||
| } | ||
|
|
||
| Builder addField(String transformAsString, int sourceId, String name) { | ||
| fields.add(new UnboundPartitionField(transformAsString, sourceId, null, name)); | ||
| return this; | ||
| return addField(transformAsString, List.of(sourceId), name); | ||
| } | ||
|
|
||
| UnboundPartitionSpec build() { | ||
|
|
@@ -109,7 +121,7 @@ UnboundPartitionSpec build() { | |
|
|
||
| static class UnboundPartitionField { | ||
| private final Transform<?, ?> transform; | ||
| private final int sourceId; | ||
| private final List<Integer> sourceIds; | ||
| private final Integer partitionId; | ||
| private final String name; | ||
|
|
||
|
|
@@ -122,7 +134,12 @@ public String transformAsString() { | |
| } | ||
|
|
||
| public int sourceId() { | ||
| return sourceId; | ||
| Preconditions.checkArgument(sourceIds.size() >= 1, "At least one source is expected"); | ||
| return sourceIds.get(0); | ||
| } | ||
|
|
||
| public List<Integer> sourceIds() { | ||
| return sourceIds; | ||
| } | ||
|
|
||
| public Integer partitionId() { | ||
|
|
@@ -134,9 +151,9 @@ public String name() { | |
| } | ||
|
|
||
| private UnboundPartitionField( | ||
| String transformAsString, int sourceId, Integer partitionId, String name) { | ||
| String transformAsString, List<Integer> sourceIds, Integer partitionId, String name) { | ||
| this.transform = Transforms.fromString(transformAsString); | ||
| this.sourceId = sourceId; | ||
| this.sourceIds = sourceIds; | ||
| this.partitionId = partitionId; | ||
| this.name = name; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.