Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ the table, the following tracks the current write support:
| Append Data Files | X |
| Rewrite Files | |
| Rewrite manifests | |
| Overwrite Files | |
| Overwrite Files | X |
| Write Pos Delete | |
| Write Eq Delete | |
| Row Delta | |
Expand Down
36 changes: 33 additions & 3 deletions internal/recipe/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,40 @@ def testSetProperties():


def testAddedFile():
spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(truncate=False)
spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show(
truncate=False
)


def testReadDifferentDataTypes():
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_different_data_types").show(truncate=False)
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_different_data_types").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_different_data_types").show(truncate=False)


def testReadSpecUpdate():
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(truncate=False)
spark.sql("DESCRIBE TABLE EXTENDED default.go_test_update_spec").show(
truncate=False
)


def testOverwriteBasic():
spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_basic").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_overwrite_basic ORDER BY baz").show(
truncate=False
)


def testOverwriteWithFilter():
spark.sql("SELECT COUNT(*) FROM default.go_test_overwrite_filter").show(
truncate=False
)
spark.sql("SELECT * FROM default.go_test_overwrite_filter ORDER BY baz").show(
truncate=False
)


if __name__ == "__main__":
Expand All @@ -54,3 +78,9 @@ def testReadSpecUpdate():

if args.test == "TestReadSpecUpdate":
testReadSpecUpdate()

if args.test == "TestOverwriteBasic":
testOverwriteBasic()

if args.test == "TestOverwriteWithFilter":
testOverwriteWithFilter()
20 changes: 20 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func (t Table) Append(ctx context.Context, rdr array.RecordReader, snapshotProps
return txn.Commit(ctx)
}

// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then committing the transaction
func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.OverwriteTable(ctx, tbl, batchSize, filter, caseSensitive, snapshotProps); err != nil {
return nil, err
}

return txn.Commit(ctx)
}

// Overwrite is a shortcut for NewTransaction().Overwrite() and then committing the transaction
func (t Table) Overwrite(ctx context.Context, rdr array.RecordReader, filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps iceberg.Properties) (*Table, error) {
txn := t.NewTransaction()
if err := txn.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps); err != nil {
return nil, err
}

return txn.Commit(ctx)
}

func (t Table) AllManifests(ctx context.Context) iter.Seq2[iceberg.ManifestFile, error] {
fs, err := t.fsF(ctx)
if err != nil {
Expand Down
46 changes: 45 additions & 1 deletion table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (t *TableWritingTestSuite) createTable(identifier table.Identifier, formatV
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
nil,
&mockedCatalog{meta},
)
}

Expand Down Expand Up @@ -1281,6 +1281,50 @@ func (t *TableWritingTestSuite) TestMergeManifests() {
t.True(array.TableEqual(resultB, resultC), "expected:\n %s\ngot:\n %s", resultB, resultC)
}

// TestOverwriteTable verifies that Table.OverwriteTable properly delegates to Transaction.OverwriteTable
func (t *TableWritingTestSuite) TestOverwriteTable() {
ident := table.Identifier{"default", "overwrite_table_wrapper_v" + strconv.Itoa(t.formatVersion)}
tbl := t.createTable(ident, t.formatVersion, *iceberg.UnpartitionedSpec, t.tableSchema)
newTable, err := array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
`[{"foo": false, "bar": "wrapper_test", "baz": 123, "qux": "2024-01-01"}]`,
})
t.Require().NoError(err)
defer newTable.Release()
resultTbl, err := tbl.OverwriteTable(t.ctx, newTable, 1, nil, true, nil)
t.Require().NoError(err)
t.NotNil(resultTbl)

snapshot := resultTbl.CurrentSnapshot()
t.NotNil(snapshot)
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table overwrite becomes append
}

// TestOverwriteRecord verifies that Table.Overwrite properly delegates to Transaction.Overwrite
func (t *TableWritingTestSuite) TestOverwriteRecord() {
ident := table.Identifier{"default", "overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)}
tbl := t.createTable(ident, t.formatVersion, *iceberg.UnpartitionedSpec, t.tableSchema)

// Create test data as RecordReader
testTable, err := array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
`[{"foo": true, "bar": "record_test", "baz": 456, "qux": "2024-01-02"}]`,
})
t.Require().NoError(err)
defer testTable.Release()

rdr := array.NewTableReader(testTable, 1)
defer rdr.Release()

// Test that Table.Overwrite works (delegates to transaction)
resultTbl, err := tbl.Overwrite(t.ctx, rdr, nil, true, nil)
t.Require().NoError(err)
t.NotNil(resultTbl)

// Verify the operation worked
snapshot := resultTbl.CurrentSnapshot()
t.NotNil(snapshot)
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table overwrite becomes append
}

func TestTableWriting(t *testing.T) {
suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
Expand Down
Loading
Loading