diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 80856983c..2ec1667a9 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -1310,5 +1310,223 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata return writeFiles(ctx, rootLocation, args.fs, meta, tasks) } - panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented)) + // For partitioned tables, group records by partition and write each group separately + // This is a basic implementation for identity transforms (required by dynamic partition overwrite) + return writePartitionedFiles(ctx, rootLocation, args.fs, meta, args, nextCount, stopCount, taskSchema, targetFileSize) +} + +// writePartitionedFiles groups records by partition and writes each group as a separate file. +// This implementation supports identity transforms only (required by dynamic partition overwrite). +func writePartitionedFiles( + ctx context.Context, + rootLocation string, + fs iceio.WriteFileIO, + meta *MetadataBuilder, + args recordWritingArgs, + nextCount func() (int, bool), + stopCount func(), + taskSchema *iceberg.Schema, + targetFileSize int64, +) iter.Seq2[iceberg.DataFile, error] { + partitionSpec := meta.CurrentSpec() + schema := meta.CurrentSchema() + + // Collect all records first + var allRecords []arrow.Record + for rec, err := range args.itr { + if err != nil { + return func(yield func(iceberg.DataFile, error) bool) { + yield(nil, err) + } + } + rec.Retain() + allRecords = append(allRecords, rec) + } + defer func() { + for _, rec := range allRecords { + rec.Release() + } + }() + + if len(allRecords) == 0 { + return func(yield func(iceberg.DataFile, error) bool) {} + } + + // Get partition field information + var partitionFields []iceberg.PartitionField + var partitionSourceNames []string + for field := range partitionSpec.Fields() { + partitionFields = append(partitionFields, field) + if sourceField, ok := schema.FindFieldByID(field.SourceID); ok { + partitionSourceNames = append(partitionSourceNames, sourceField.Name) + } + } + + // For identity transforms, we can extract partition values directly from the records + // Group records by partition + partitionGroups := make(map[string][]arrow.Record) + + for _, rec := range allRecords { + // Extract partition key from first row (all rows in a record should have same partition for identity) + partitionKey := extractPartitionKey(rec, partitionFields, partitionSourceNames, schema) + + group := partitionGroups[partitionKey] + group = append(group, rec) + partitionGroups[partitionKey] = group + } + + // Write each partition group + return func(yield func(iceberg.DataFile, error) bool) { + defer stopCount() + + for partitionKey, records := range partitionGroups { + _ = partitionKey + + for batch := range binPackPartitionRecords(records, 20, targetFileSize) { + cnt, _ := nextCount() + t := WriteTask{ + Uuid: *args.writeUUID, + ID: cnt, + Schema: taskSchema, + Batches: batch, + } + + df, err := writePartitionedBatch(ctx, rootLocation, fs, meta, t, partitionSpec) + if err != nil { + if !yield(nil, err) { + return + } + continue + } + + if !yield(df, nil) { + return + } + } + } + } +} + +// extractPartitionKey extracts a partition key string from the first row of a record. +// This works for identity transforms only. +func extractPartitionKey(rec arrow.Record, partitionFields []iceberg.PartitionField, partitionSourceNames []string, schema *iceberg.Schema) string { + if rec.NumRows() == 0 { + return "" + } + + var keyParts []string + for i, field := range partitionFields { + if i >= len(partitionSourceNames) { + continue + } + + colName := partitionSourceNames[i] + colIdx := rec.Schema().FieldIndices(colName) + if len(colIdx) == 0 { + keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID)) + continue + } + + arr := rec.Column(colIdx[0]) + if arr.Len() == 0 { + keyParts = append(keyParts, fmt.Sprintf("%d=null", field.FieldID)) + continue + } + + // Get value from first row + val := getValueFromArray(arr, 0) + keyParts = append(keyParts, fmt.Sprintf("%d=%v", field.FieldID, val)) + } + + return strings.Join(keyParts, "/") +} + +// getValueFromArray extracts a value from an Arrow array at the given index. +// For chunked arrays, it uses the first chunk. +func getValueFromArray(arr arrow.Array, idx int) interface{} { + if arr.Len() <= idx { + return nil + } + + if arr.IsNull(idx) { + return nil + } + + // Handle different array types + switch a := arr.(type) { + case *array.Int32: + return a.Value(idx) + case *array.Int64: + return a.Value(idx) + case *array.Float32: + return a.Value(idx) + case *array.Float64: + return a.Value(idx) + case *array.String: + return a.Value(idx) + case *array.Boolean: + return a.Value(idx) + case *array.Date32: + return a.Value(idx) + case *array.Timestamp: + return a.Value(idx) + default: + // For other types, use string representation + return a.ValueStr(idx) + } +} + +// binPackPartitionRecords bins records into batches similar to binPackRecords. +func binPackPartitionRecords(records []arrow.Record, recordLookback int, targetFileSize int64) iter.Seq[[]arrow.Record] { + return internal.PackingIterator(func(yield func(arrow.Record) bool) { + for _, rec := range records { + rec.Retain() + if !yield(rec) { + return + } + } + }, targetFileSize, recordLookback, recordNBytes, false) +} + +// writePartitionedBatch writes a batch of records for a partitioned table. +func writePartitionedBatch(ctx context.Context, rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, task WriteTask, spec iceberg.PartitionSpec) (iceberg.DataFile, error) { + locProvider, err := LoadLocationProvider(rootLocation, meta.props) + if err != nil { + return nil, err + } + + format := tblutils.GetFileFormat(iceberg.ParquetFile) + fileSchema := meta.CurrentSchema() + sanitized, err := iceberg.SanitizeColumnNames(fileSchema) + if err != nil { + return nil, err + } + + if !sanitized.Equals(fileSchema) { + fileSchema = sanitized + } + + batches := make([]arrow.Record, len(task.Batches)) + for i, b := range task.Batches { + rec, err := ToRequestedSchema(ctx, fileSchema, task.Schema, b, false, true, false) + if err != nil { + return nil, err + } + batches[i] = rec + } + + statsCols, err := computeStatsPlan(fileSchema, meta.props) + if err != nil { + return nil, err + } + + filePath := locProvider.NewDataLocation(task.GenerateDataFileName("parquet")) + + return format.WriteDataFile(ctx, fs, tblutils.WriteFileInfo{ + FileSchema: fileSchema, + Spec: spec, + FileName: filePath, + StatsCols: statsCols, + WriteProps: format.GetWriteProperties(meta.props), + }, batches) } diff --git a/table/transaction.go b/table/transaction.go index 63790893e..92c3be073 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -24,6 +24,7 @@ import ( "fmt" "runtime" "slices" + "strings" "sync" "time" @@ -343,6 +344,223 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return t.apply(updates, reqs) } +// DynamicPartitionOverwrite performs a dynamic partition overwrite operation. +// It detects partition values in the provided arrow table using the current +// partition spec, deletes existing partitions matching these values, and then +// appends the new data. +func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { + if t.meta.CurrentSpec().IsUnpartitioned() { + return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) + } + + // TODO: Support non-identity transforms in dynamic partition overwrite. + // Currently, this is a limitation of the Go implementation. The Iceberg spec and Java + // implementation (BaseReplacePartitions) support any partition spec, but building partition + // predicates for non-identity transforms (e.g., day(ts) = X) requires transform-aware + // predicate construction which is not yet implemented. + // Reference: iceberg/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java + currentSpec := t.meta.CurrentSpec() + for field := range currentSpec.Fields() { + if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { + return fmt.Errorf("%w: dynamic overwrite currently only supports identity-transform fields in partition spec (limitation, not spec requirement): %s", + ErrInvalidOperation, field.Name) + } + } + + if tbl.NumRows() == 0 { + return nil + } + + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + commitUUID := uuid.New() + rdr := array.NewTableReader(tbl, batchSize) + defer rdr.Release() + dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ + sc: tbl.Schema(), + itr: array.IterFromReader(rdr), + fs: fs.(io.WriteFileIO), + writeUUID: &commitUUID, + }) + + // Collect data files and unique partitions in one pass + var allDataFiles []iceberg.DataFile + partitionsToOverwrite := make([]map[int]any, 0) + partitionSet := make(map[string]struct{}) + for df, err := range dataFiles { + if err != nil { + return err + } + allDataFiles = append(allDataFiles, df) + + // Extract partition if not empty + partition := df.Partition() + if len(partition) == 0 { + continue + } + + partitionKey := partitionMapKey(partition) + if _, seen := partitionSet[partitionKey]; !seen { + partitionSet[partitionKey] = struct{}{} + // Make a copy of the partition map + partitionCopy := make(map[int]any, len(partition)) + for k, v := range partition { + partitionCopy[k] = v + } + partitionsToOverwrite = append(partitionsToOverwrite, partitionCopy) + } + } + + deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) + + if err := t.deleteFileByFilter(ctx, deleteFilter, &commitUUID, snapshotProps); err != nil { + return err + } + + appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + for _, df := range allDataFiles { + appendFiles.appendDataFile(df) + } + + updates, reqs, err := appendFiles.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// deleteFileByFilter performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) error { + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(commitUUID) + + currentSnapshot := t.meta.currentSnapshot() + if currentSnapshot == nil { + return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation) + } + + scan, err := t.Scan(WithRowFilter(filter)) + if err != nil { + return err + } + fileScan, err := scan.PlanFiles(ctx) + if err != nil { + return err + } + + // Mark files for deletion + for _, task := range fileScan { + deleteProducer.deleteDataFile(task.File) + } + + updates, reqs, err := deleteProducer.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// buildPartitionPredicate builds a filter predicate matching any of the input partition records. +// partitionRecords is a slice of partition maps, where each map is keyed by partition field ID. +func (t *Transaction) buildPartitionPredicate(partitionRecords []map[int]any) iceberg.BooleanExpression { + partitionSpec := t.meta.CurrentSpec() + schema := t.meta.CurrentSchema() + + // Build OR expression for all partitions + var expressions []iceberg.BooleanExpression + + for _, partitionMap := range partitionRecords { + // Build AND expression for this partition + var partitionExprs []iceberg.BooleanExpression + + for field := range partitionSpec.Fields() { + sourceField, ok := schema.FindFieldByID(field.SourceID) + if !ok { + continue + } + + partitionValue, hasValue := partitionMap[field.FieldID] + + if !hasValue || partitionValue == nil { + partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(sourceField.Name))) + } else { + partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(sourceField.Name), partitionValue)) + } + } + + if len(partitionExprs) > 0 { + var partitionExpr iceberg.BooleanExpression + if len(partitionExprs) == 1 { + partitionExpr = partitionExprs[0] + } else { + partitionExpr = iceberg.NewAnd(partitionExprs[0], partitionExprs[1], partitionExprs[2:]...) + } + expressions = append(expressions, partitionExpr) + } + } + + if len(expressions) == 0 { + return iceberg.AlwaysFalse{} + } + + if len(expressions) == 1 { + return expressions[0] + } + + return iceberg.NewOr(expressions[0], expressions[1], expressions[2:]...) +} + +// partitionMapKey creates a canonical string representation of a partition map for deduplication. +// This is used to identify unique partitions, but the actual partition map is used for predicate building. +func partitionMapKey(partition map[int]any) string { + // Sort keys for consistent representation + keys := make([]int, 0, len(partition)) + for k := range partition { + keys = append(keys, k) + } + slices.Sort(keys) + + var parts []string + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%d=%v", k, partition[k])) + } + return strings.Join(parts, "/") +} + +// createEqualToExpression creates an EqualTo expression by switching on the value type. +// Type conversion is handled automatically when the expression is bound to a schema. +func createEqualToExpression(term iceberg.UnboundTerm, value interface{}) iceberg.BooleanExpression { + switch v := value.(type) { + case int32: + return iceberg.EqualTo(term, v) + case int64: + return iceberg.EqualTo(term, v) + case int: + return iceberg.EqualTo(term, int64(v)) + case float32: + return iceberg.EqualTo(term, v) + case float64: + return iceberg.EqualTo(term, v) + case string: + return iceberg.EqualTo(term, v) + case bool: + return iceberg.EqualTo(term, v) + case nil: + return iceberg.IsNull(term) + default: + return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) + } +} + func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { updatedMeta, err := t.meta.Build() if err != nil { diff --git a/table/transaction_test.go b/table/transaction_test.go index 40f015c65..d7e8a9d00 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -34,9 +34,11 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" + "github.com/apache/iceberg-go/internal" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose" ) @@ -134,3 +136,203 @@ func (s *SparkIntegrationTestSuite) TestAddFile() { func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) } + +// Dynamic Partition Overwrite Tests +func TestDynamicPartitionOverwrite_UnpartitionedTable(t *testing.T) { + // Create a table with no partition spec + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(iceberg.UnpartitionedSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for an unpartitioned table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot apply dynamic overwrite on an unpartitioned table") +} + +func TestDynamicPartitionOverwrite_NonIdentityTransform(t *testing.T) { + // Create a table with a non-identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id_bucket", + Transform: iceberg.BucketTransform{NumBuckets: 4}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for non-identity transform + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "dynamic overwrite currently only supports identity-transform fields in partition spec (limitation, not spec requirement)") +} + +func TestDynamicPartitionOverwrite_EmptyTable(t *testing.T) { + // Create a table with identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id", + Transform: iceberg.IdentityTransform{}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + tbl := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := tbl.NewTransaction() + + // Create empty test data (empty arrays for each field) + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + mem := memory.DefaultAllocator + idArr := array.NewInt32Builder(mem).NewArray() + dataArr := array.NewStringBuilder(mem).NewArray() + record := array.NewRecord(arrowSchema, []arrow.Array{idArr, dataArr}, 0) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return no error for an empty table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.NoError(t, err) +} + +// TODO: Find a way to test the happy path of DynamicPartitionOverwrite