From de835157d3e518c84fc594b330e99b539adda430 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Tue, 8 Jul 2025 22:58:14 +0100 Subject: [PATCH 1/3] Support Dynamic Partition Overwrite Signed-off-by: dttung2905 --- table/transaction.go | 246 ++++++++++++++++++++++++++++++++++++++ table/transaction_test.go | 202 +++++++++++++++++++++++++++++++ 2 files changed, 448 insertions(+) diff --git a/table/transaction.go b/table/transaction.go index 63790893e..a749d31bb 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -24,6 +24,8 @@ import ( "fmt" "runtime" "slices" + "strconv" + "strings" "sync" "time" @@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return t.apply(updates, reqs) } +// DynamicPartitionOverwrite performs a dynamic partition overwrite operation. +// It detects partition values in the provided arrow table using the current +// partition spec, deletes existing partitions matching these values, and then +// appends the new data. +func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { + if t.meta.CurrentSpec().IsUnpartitioned() { + return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) + } + + // Check that all partition fields use identity transforms + currentSpec := t.meta.CurrentSpec() + for field := range currentSpec.Fields() { + if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { + return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", + ErrInvalidOperation, field.Name) + } + } + + if tbl.NumRows() == 0 { + return nil + } + + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + commitUUID := uuid.New() + rdr := array.NewTableReader(tbl, batchSize) + defer rdr.Release() + dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ + sc: tbl.Schema(), + itr: array.IterFromReader(rdr), + fs: fs.(io.WriteFileIO), + writeUUID: &commitUUID, + }) + + var allDataFiles []iceberg.DataFile + for df, err := range dataFiles { + if err != nil { + return err + } + allDataFiles = append(allDataFiles, df) + } + + partitionsToOverwrite := make(map[string]struct{}) + for _, df := range allDataFiles { + partitionKey := fmt.Sprintf("%v", df.Partition()) + partitionsToOverwrite[partitionKey] = struct{}{} + } + + deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) + + if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { + return err + } + + appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + for _, df := range allDataFiles { + appendFiles.appendDataFile(df) + } + + updates, reqs, err := appendFiles.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// Delete performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) + + currentSnapshot := t.meta.currentSnapshot() + if currentSnapshot == nil { + return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation) + } + + scan, err := t.Scan(WithRowFilter(filter)) + if err != nil { + return err + } + fileScan, err := scan.PlanFiles(ctx) + if err != nil { + return err + } + + // Mark files for deletion + for _, task := range fileScan { + deleteProducer.deleteDataFile(task.File) + } + + updates, reqs, err := deleteProducer.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// buildPartitionPredicate builds a filter predicate matching any of the input partition records. +func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression { + partitionSpec := t.meta.CurrentSpec() + schema := t.meta.CurrentSchema() + + var partitionFields []string + for field := range partitionSpec.Fields() { + if field, ok := schema.FindFieldByID(field.SourceID); ok { + partitionFields = append(partitionFields, field.Name) + } + } + + // Build OR expression for all partitions + var expressions []iceberg.BooleanExpression + + for partitionKey := range partitionRecords { + partitionValues := parsePartitionKey(partitionKey, partitionFields) + + // Build AND expression for this partition + var partitionExprs []iceberg.BooleanExpression + for i, fieldName := range partitionFields { + if i < len(partitionValues) { + value := partitionValues[i] + if value == nil { + partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName))) + } else { + // Create an expression based on a field type + if field, ok := schema.FindFieldByName(fieldName); ok { + partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type)) + } + } + } + } + + if len(partitionExprs) > 0 { + partitionExpr := partitionExprs[0] + for _, expr := range partitionExprs[1:] { + partitionExpr = iceberg.NewAnd(partitionExpr, expr) + } + expressions = append(expressions, partitionExpr) + } + } + + if len(expressions) == 0 { + return iceberg.AlwaysFalse{} + } + + result := expressions[0] + for _, expr := range expressions[1:] { + result = iceberg.NewOr(result, expr) + } + + return result +} + +// parsePartitionKey parses a partition key string into individual values. +func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { + // Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" + parts := strings.Split(partitionKey, "/") + values := make([]interface{}, len(fieldNames)) + + for i, part := range parts { + if i >= len(fieldNames) { + break + } + + if strings.Contains(part, "=") { + kv := strings.SplitN(part, "=", 2) + if len(kv) == 2 { + values[i] = parsePartitionValue(kv[1]) + } + } + } + + return values +} + +// parsePartitionValue converts a string partition value to the appropriate type. +func parsePartitionValue(valueStr string) interface{} { + if valueStr == "null" || valueStr == "" { + return nil + } + + if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil { + return i + } + + if f, err := strconv.ParseFloat(valueStr, 64); err == nil { + return f + } + + if b, err := strconv.ParseBool(valueStr); err == nil { + return b + } + + return valueStr +} + +// createEqualToExpression creates an EqualTo expression with the correct type +func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression { + switch t := typ.(type) { + case iceberg.PrimitiveType: + switch t { + case iceberg.PrimitiveTypes.Int32: + if v, ok := value.(int32); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Int64: + if v, ok := value.(int64); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Float32: + if v, ok := value.(float32); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Float64: + if v, ok := value.(float64); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.String: + if v, ok := value.(string); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Bool: + if v, ok := value.(bool); ok { + return iceberg.EqualTo(term, v) + } + } + } + + // Fallback to string + if v, ok := value.(string); ok { + return iceberg.EqualTo(term, v) + } + + return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) +} + func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { updatedMeta, err := t.meta.Build() if err != nil { diff --git a/table/transaction_test.go b/table/transaction_test.go index 40f015c65..664dade5a 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -34,9 +34,11 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" + "github.com/apache/iceberg-go/internal" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose" ) @@ -134,3 +136,203 @@ func (s *SparkIntegrationTestSuite) TestAddFile() { func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) } + +// Dynamic Partition Overwrite Tests +func TestDynamicPartitionOverwrite_UnpartitionedTable(t *testing.T) { + // Create a table with no partition spec + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(iceberg.UnpartitionedSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for an unpartitioned table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot apply dynamic overwrite on an unpartitioned table") +} + +func TestDynamicPartitionOverwrite_NonIdentityTransform(t *testing.T) { + // Create a table with a non-identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id_bucket", + Transform: iceberg.BucketTransform{NumBuckets: 4}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for non-identity transform + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "dynamic overwrite does not support non-identity-transform fields in partition spec") +} + +func TestDynamicPartitionOverwrite_EmptyTable(t *testing.T) { + // Create a table with identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id", + Transform: iceberg.IdentityTransform{}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + tbl := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := tbl.NewTransaction() + + // Create empty test data (empty arrays for each field) + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + mem := memory.DefaultAllocator + idArr := array.NewInt32Builder(mem).NewArray() + dataArr := array.NewStringBuilder(mem).NewArray() + record := array.NewRecord(arrowSchema, []arrow.Array{idArr, dataArr}, 0) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return no error for an empty table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.NoError(t, err) +} + +// TODO: Find a way to test the happy path of DynamicPartitionOverwrite From 931506de1279fa64e59c95ab76c52db5f1c623f1 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Wed, 9 Jul 2025 18:49:10 +0100 Subject: [PATCH 2/3] Make deleteByFilter method private Signed-off-by: dttung2905 --- table/transaction.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/table/transaction.go b/table/transaction.go index a749d31bb..35c99dcf3 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -398,7 +398,7 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) - if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { + if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err != nil { return err } @@ -415,8 +415,8 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T return t.apply(updates, reqs) } -// Delete performs a delete operation with the given filter and snapshot properties. -func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { +// deleteFileByFilter performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { fs, err := t.tbl.fsF(ctx) if err != nil { return err From 784146f88b10a58e80755a33256703f7c270912a Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Fri, 26 Dec 2025 19:23:18 +0000 Subject: [PATCH 3/3] Rebase from main and fix some code review comments Signed-off-by: dttung2905 --- table/arrow_utils.go | 220 +++++++++++++++++++++++++++++++++++++- table/transaction.go | 200 +++++++++++++++------------------- table/transaction_test.go | 2 +- 3 files changed, 306 insertions(+), 116 deletions(-) diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 80856983c..2ec1667a9 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -1310,5 +1310,223 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata return writeFiles(ctx, rootLocation, args.fs, meta, tasks) } - panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented)) + // For partitioned tables, group records by partition and write each group separately + // This is a basic implementation for identity transforms (required by dynamic partition overwrite) + return writePartitionedFiles(ctx, rootLocation, args.fs, meta, args, nextCount, stopCount, taskSchema, targetFileSize) +} + +// writePartitionedFiles groups records by partition and writes each group as a separate file. +// This implementation supports identity transforms only (required by dynamic partition overwrite). +func writePartitionedFiles( + ctx context.Context, + rootLocation string, + fs iceio.WriteFileIO, + meta *MetadataBuilder, + args recordWritingArgs, + nextCount func() (int, bool), + stopCount func(), + taskSchema *iceberg.Schema, + targetFileSize int64, +) iter.Seq2[iceberg.DataFile, error] { + partitionSpec := meta.CurrentSpec() + schema := meta.CurrentSchema() + + // Collect all records first + var allRecords []arrow.Record + for rec, err := range args.itr { + if err != nil { + return func(yield func(iceberg.DataFile, error) bool) { + yield(nil, err) + } + } + rec.Retain() + allRecords = append(allRecords, rec) + } + defer func() { + for _, rec := range allRecords { + rec.Release() + } + }() + + if len(allRecords) == 0 { + return func(yield func(iceberg.DataFile, error) bool) {} + } + + // Get partition field information + var partitionFields []iceberg.PartitionField + var partitionSourceNames []string + for field := range partitionSpec.Fields() { + partitionFields = append(partitionFields, field) + if sourceField, ok := schema.FindFieldByID(field.SourceID); ok { + partitionSourceNames = append(partitionSourceNames, sourceField.Name) + } + } + + // For identity transforms, we can extract partition values directly from the records + // Group records by partition + partitionGroups := make(map[string][]arrow.Record) + + for _, rec := range allRecords { + // Extract partition key from first row (all rows in a record should have same partition for identity) + partitionKey := extractPartitionKey(rec, partitionFields, partitionSourceNames, schema) + + group := partitionGroups[partitionKey] + group = append(group, rec) + partitionGroups[partitionKey] = group + } + + // Write each partition group + return func(yield func(iceberg.DataFile, error) bool) { + defer stopCount() + + for partitionKey, records := range partitionGroups { + _ = partitionKey + + for batch := range binPackPartitionRecords(records, 20, targetFileSize) { + cnt, _ := nextCount() + t := WriteTask{ + Uuid: *args.writeUUID, + ID: cnt, + Schema: taskSchema, + Batches: batch, + } + + df, err := writePartitionedBatch(ctx, rootLocation, fs, meta, t, partitionSpec) + if err != nil { + if !yield(nil, err) { + return + } + continue + } + + if !yield(df, nil) { + return + } + } + } + } +} + +// extractPartitionKey extracts a partition key string from the first row of a record. +// This works for identity transforms only. +func extractPartitionKey(rec arrow.Record, partitionFields []iceberg.PartitionField, partitionSourceNames []string, schema *iceberg.Schema) string { + if rec.NumRows() == 0 { + return "" + } + + var keyParts []string + for i, field := range partitionFields { + if i >= len(partitionSourceNames) { + continue + } + + colName := partitionSourceNames[i] + colIdx := rec.Schema().FieldIndices(colName) + if len(colIdx) == 0 { + keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID)) + continue + } + + arr := rec.Column(colIdx[0]) + if arr.Len() == 0 { + keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID)) + continue + } + + // Get value from first row + val := getValueFromArray(arr, 0) + keyParts = append(keyParts, fmt.Sprintf("%d=%v", field.FieldID, val)) + } + + return strings.Join(keyParts, "/") +} + +// getValueFromArray extracts a value from an Arrow array at the given index. +// For chunked arrays, it uses the first chunk. +func getValueFromArray(arr arrow.Array, idx int) interface{} { + if arr.Len() <= idx { + return nil + } + + if arr.IsNull(idx) { + return nil + } + + // Handle different array types + switch a := arr.(type) { + case *array.Int32: + return a.Value(idx) + case *array.Int64: + return a.Value(idx) + case *array.Float32: + return a.Value(idx) + case *array.Float64: + return a.Value(idx) + case *array.String: + return a.Value(idx) + case *array.Boolean: + return a.Value(idx) + case *array.Date32: + return a.Value(idx) + case *array.Timestamp: + return a.Value(idx) + default: + // For other types, use string representation + return a.ValueStr(idx) + } +} + +// binPackPartitionRecords bins records into batches similar to binPackRecords. +func binPackPartitionRecords(records []arrow.Record, recordLookback int, targetFileSize int64) iter.Seq[[]arrow.Record] { + return internal.PackingIterator(func(yield func(arrow.Record) bool) { + for _, rec := range records { + rec.Retain() + if !yield(rec) { + return + } + } + }, targetFileSize, recordLookback, recordNBytes, false) +} + +// writePartitionedBatch writes a batch of records for a partitioned table. +func writePartitionedBatch(ctx context.Context, rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, task WriteTask, spec iceberg.PartitionSpec) (iceberg.DataFile, error) { + locProvider, err := LoadLocationProvider(rootLocation, meta.props) + if err != nil { + return nil, err + } + + format := tblutils.GetFileFormat(iceberg.ParquetFile) + fileSchema := meta.CurrentSchema() + sanitized, err := iceberg.SanitizeColumnNames(fileSchema) + if err != nil { + return nil, err + } + + if !sanitized.Equals(fileSchema) { + fileSchema = sanitized + } + + batches := make([]arrow.Record, len(task.Batches)) + for i, b := range task.Batches { + rec, err := ToRequestedSchema(ctx, fileSchema, task.Schema, b, false, true, false) + if err != nil { + return nil, err + } + batches[i] = rec + } + + statsCols, err := computeStatsPlan(fileSchema, meta.props) + if err != nil { + return nil, err + } + + filePath := locProvider.NewDataLocation(task.GenerateDataFileName("parquet")) + + return format.WriteDataFile(ctx, fs, tblutils.WriteFileInfo{ + FileSchema: fileSchema, + Spec: spec, + FileName: filePath, + StatsCols: statsCols, + WriteProps: format.GetWriteProperties(meta.props), + }, batches) } diff --git a/table/transaction.go b/table/transaction.go index 35c99dcf3..92c3be073 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -24,7 +24,6 @@ import ( "fmt" "runtime" "slices" - "strconv" "strings" "sync" "time" @@ -354,11 +353,16 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) } - // Check that all partition fields use identity transforms + // TODO: Support non-identity transforms in dynamic partition overwrite. + // Currently, this is a limitation of the Go implementation. The Iceberg spec and Java + // implementation (BaseReplacePartitions) support any partition spec, but building partition + // predicates for non-identity transforms (e.g., day(ts) = X) requires transform-aware + // predicate construction which is not yet implemented. + // Reference: iceberg/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java currentSpec := t.meta.CurrentSpec() for field := range currentSpec.Fields() { if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { - return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", + return fmt.Errorf("%w: dynamic overwrite currently only supports identity-transform fields in partition spec (limitation, not spec requirement): %s", ErrInvalidOperation, field.Name) } } @@ -382,23 +386,37 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T writeUUID: &commitUUID, }) + // Collect data files and unique partitions in one pass var allDataFiles []iceberg.DataFile + partitionsToOverwrite := make([]map[int]any, 0) + partitionSet := make(map[string]struct{}) for df, err := range dataFiles { if err != nil { return err } allDataFiles = append(allDataFiles, df) - } - partitionsToOverwrite := make(map[string]struct{}) - for _, df := range allDataFiles { - partitionKey := fmt.Sprintf("%v", df.Partition()) - partitionsToOverwrite[partitionKey] = struct{}{} + // Extract partition if not empty + partition := df.Partition() + if len(partition) == 0 { + continue + } + + partitionKey := partitionMapKey(partition) + if _, seen := partitionSet[partitionKey]; !seen { + partitionSet[partitionKey] = struct{}{} + // Make a copy of the partition map + partitionCopy := make(map[int]any, len(partition)) + for k, v := range partition { + partitionCopy[k] = v + } + partitionsToOverwrite = append(partitionsToOverwrite, partitionCopy) + } } deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) - if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err != nil { + if err := t.deleteFileByFilter(ctx, deleteFilter, &commitUUID, snapshotProps); err != nil { return err } @@ -416,13 +434,13 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T } // deleteFileByFilter performs a delete operation with the given filter and snapshot properties. -func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { +func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) error { fs, err := t.tbl.fsF(ctx) if err != nil { return err } - deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) + deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(commitUUID) currentSnapshot := t.meta.currentSnapshot() if currentSnapshot == nil { @@ -452,43 +470,39 @@ func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.Boo } // buildPartitionPredicate builds a filter predicate matching any of the input partition records. -func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression { +// partitionRecords is a slice of partition maps, where each map is keyed by partition field ID. +func (t *Transaction) buildPartitionPredicate(partitionRecords []map[int]any) iceberg.BooleanExpression { partitionSpec := t.meta.CurrentSpec() schema := t.meta.CurrentSchema() - var partitionFields []string - for field := range partitionSpec.Fields() { - if field, ok := schema.FindFieldByID(field.SourceID); ok { - partitionFields = append(partitionFields, field.Name) - } - } - // Build OR expression for all partitions var expressions []iceberg.BooleanExpression - for partitionKey := range partitionRecords { - partitionValues := parsePartitionKey(partitionKey, partitionFields) - + for _, partitionMap := range partitionRecords { // Build AND expression for this partition var partitionExprs []iceberg.BooleanExpression - for i, fieldName := range partitionFields { - if i < len(partitionValues) { - value := partitionValues[i] - if value == nil { - partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName))) - } else { - // Create an expression based on a field type - if field, ok := schema.FindFieldByName(fieldName); ok { - partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type)) - } - } + + for field := range partitionSpec.Fields() { + sourceField, ok := schema.FindFieldByID(field.SourceID) + if !ok { + continue + } + + partitionValue, hasValue := partitionMap[field.FieldID] + + if !hasValue || partitionValue == nil { + partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(sourceField.Name))) + } else { + partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(sourceField.Name), partitionValue)) } } if len(partitionExprs) > 0 { - partitionExpr := partitionExprs[0] - for _, expr := range partitionExprs[1:] { - partitionExpr = iceberg.NewAnd(partitionExpr, expr) + var partitionExpr iceberg.BooleanExpression + if len(partitionExprs) == 1 { + partitionExpr = partitionExprs[0] + } else { + partitionExpr = iceberg.NewAnd(partitionExprs[0], partitionExprs[1], partitionExprs[2:]...) } expressions = append(expressions, partitionExpr) } @@ -498,95 +512,53 @@ func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct return iceberg.AlwaysFalse{} } - result := expressions[0] - for _, expr := range expressions[1:] { - result = iceberg.NewOr(result, expr) + if len(expressions) == 1 { + return expressions[0] } - return result + return iceberg.NewOr(expressions[0], expressions[1], expressions[2:]...) } -// parsePartitionKey parses a partition key string into individual values. -func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { - // Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" - parts := strings.Split(partitionKey, "/") - values := make([]interface{}, len(fieldNames)) - - for i, part := range parts { - if i >= len(fieldNames) { - break - } - - if strings.Contains(part, "=") { - kv := strings.SplitN(part, "=", 2) - if len(kv) == 2 { - values[i] = parsePartitionValue(kv[1]) - } - } +// partitionMapKey creates a canonical string representation of a partition map for deduplication. +// This is used to identify unique partitions, but the actual partition map is used for predicate building. +func partitionMapKey(partition map[int]any) string { + // Sort keys for consistent representation + keys := make([]int, 0, len(partition)) + for k := range partition { + keys = append(keys, k) } + slices.Sort(keys) - return values -} - -// parsePartitionValue converts a string partition value to the appropriate type. -func parsePartitionValue(valueStr string) interface{} { - if valueStr == "null" || valueStr == "" { - return nil + var parts []string + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%d=%v", k, partition[k])) } - - if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil { - return i - } - - if f, err := strconv.ParseFloat(valueStr, 64); err == nil { - return f - } - - if b, err := strconv.ParseBool(valueStr); err == nil { - return b - } - - return valueStr + return strings.Join(parts, "/") } -// createEqualToExpression creates an EqualTo expression with the correct type -func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression { - switch t := typ.(type) { - case iceberg.PrimitiveType: - switch t { - case iceberg.PrimitiveTypes.Int32: - if v, ok := value.(int32); ok { - return iceberg.EqualTo(term, v) - } - case iceberg.PrimitiveTypes.Int64: - if v, ok := value.(int64); ok { - return iceberg.EqualTo(term, v) - } - case iceberg.PrimitiveTypes.Float32: - if v, ok := value.(float32); ok { - return iceberg.EqualTo(term, v) - } - case iceberg.PrimitiveTypes.Float64: - if v, ok := value.(float64); ok { - return iceberg.EqualTo(term, v) - } - case iceberg.PrimitiveTypes.String: - if v, ok := value.(string); ok { - return iceberg.EqualTo(term, v) - } - case iceberg.PrimitiveTypes.Bool: - if v, ok := value.(bool); ok { - return iceberg.EqualTo(term, v) - } - } - } - - // Fallback to string - if v, ok := value.(string); ok { +// createEqualToExpression creates an EqualTo expression by switching on the value type. +// Type conversion is handled automatically when the expression is bound to a schema. +func createEqualToExpression(term iceberg.UnboundTerm, value interface{}) iceberg.BooleanExpression { + switch v := value.(type) { + case int32: + return iceberg.EqualTo(term, v) + case int64: return iceberg.EqualTo(term, v) + case int: + return iceberg.EqualTo(term, int64(v)) + case float32: + return iceberg.EqualTo(term, v) + case float64: + return iceberg.EqualTo(term, v) + case string: + return iceberg.EqualTo(term, v) + case bool: + return iceberg.EqualTo(term, v) + case nil: + return iceberg.IsNull(term) + default: + return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) } - - return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) } func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { diff --git a/table/transaction_test.go b/table/transaction_test.go index 664dade5a..d7e8a9d00 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -268,7 +268,7 @@ func TestDynamicPartitionOverwrite_NonIdentityTransform(t *testing.T) { // Should return an error for non-identity transform err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) assert.Error(t, err) - assert.Contains(t, err.Error(), "dynamic overwrite does not support non-identity-transform fields in partition spec") + assert.Contains(t, err.Error(), "dynamic overwrite currently only supports identity-transform fields in partition spec (limitation, not spec requirement)") } func TestDynamicPartitionOverwrite_EmptyTable(t *testing.T) {