From 36fb20f5274820b3a17c28db5efba715dc619d37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Martins?= Date: Wed, 21 Jan 2026 13:05:46 +0000 Subject: [PATCH 1/3] Spark: Add streaming-overwrite-mode option for handling OVERWRITE snapshots --- docs/docs/spark-structured-streaming.md | 10 +- .../apache/iceberg/spark/SparkReadConf.java | 29 +++ .../iceberg/spark/SparkReadOptions.java | 13 +- .../iceberg/spark/StreamingOverwriteMode.java | 57 ++++++ .../spark/source/SparkMicroBatchStream.java | 37 +++- .../spark/TestStreamingOverwriteMode.java | 72 ++++++++ .../source/TestStructuredStreamingRead3.java | 171 ++++++++++++++++++ .../apache/iceberg/spark/SparkReadConf.java | 29 +++ .../iceberg/spark/SparkReadOptions.java | 13 +- .../iceberg/spark/StreamingOverwriteMode.java | 57 ++++++ .../spark/source/SparkMicroBatchStream.java | 37 +++- .../spark/TestStreamingOverwriteMode.java | 72 ++++++++ .../source/TestStructuredStreamingRead3.java | 171 ++++++++++++++++++ .../apache/iceberg/spark/SparkReadConf.java | 29 +++ .../iceberg/spark/SparkReadOptions.java | 13 +- .../iceberg/spark/StreamingOverwriteMode.java | 57 ++++++ .../spark/source/SparkMicroBatchStream.java | 37 +++- .../spark/TestStreamingOverwriteMode.java | 72 ++++++++ .../source/TestStructuredStreamingRead3.java | 171 ++++++++++++++++++ .../apache/iceberg/spark/SparkReadConf.java | 29 +++ .../iceberg/spark/SparkReadOptions.java | 13 +- .../iceberg/spark/StreamingOverwriteMode.java | 57 ++++++ .../spark/source/SparkMicroBatchStream.java | 37 +++- .../spark/TestStreamingOverwriteMode.java | 72 ++++++++ .../source/TestStructuredStreamingRead3.java | 171 ++++++++++++++++++ 25 files changed, 1489 insertions(+), 37 deletions(-) create mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java create mode 100644 spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java diff --git a/docs/docs/spark-structured-streaming.md b/docs/docs/spark-structured-streaming.md index e722df1ea4e7..3b6fe564cf93 100644 --- a/docs/docs/spark-structured-streaming.md +++ b/docs/docs/spark-structured-streaming.md @@ -34,7 +34,15 @@ val df = spark.readStream ``` !!! warning - Iceberg only supports reading data from append snapshots. Overwrite snapshots cannot be processed and will cause an exception by default. Overwrites may be ignored by setting `streaming-skip-overwrite-snapshots=true`. Similarly, delete snapshots will cause an exception by default, and deletes may be ignored by setting `streaming-skip-delete-snapshots=true`. + Iceberg only supports reading data from append snapshots by default. Overwrite snapshots can be handled using the `streaming-overwrite-mode` option: + + * `fail` (default): Throws an exception when an overwrite snapshot is encountered + * `skip`: Ignores overwrite snapshots entirely + * `added-files-only`: Processes only the added files from overwrite snapshots. **Warning:** This may produce duplicate records when overwrites rewrite existing data (e.g., MERGE, UPDATE, DELETE). Downstream processing must handle duplicates (e.g., idempotent writes, deduplication). + + The deprecated `streaming-skip-overwrite-snapshots=true` option is equivalent to `streaming-overwrite-mode=skip`. + + Similarly, delete snapshots will cause an exception by default, and deletes may be ignored by setting `streaming-skip-delete-snapshots=true`. ### Limit input rate To control the size of micro-batches in the DataFrame API, Iceberg supports two read options: diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index dd7e2c20c1b9..10d4171d15f0 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -29,6 +29,8 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class for common Iceberg configs for Spark reads. @@ -50,6 +52,8 @@ */ public class SparkReadConf { + private static final Logger LOG = LoggerFactory.getLogger(SparkReadConf.class); + private static final String DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize"; private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G"; private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 * 1024; // 256 MB @@ -147,6 +151,11 @@ public boolean streamingSkipDeleteSnapshots() { .parse(); } + /** + * @deprecated and will be removed in a future release; use {@link #streamingOverwriteMode()} + * instead. + */ + @Deprecated public boolean streamingSkipOverwriteSnapshots() { return confParser .booleanConf() @@ -155,6 +164,26 @@ public boolean streamingSkipOverwriteSnapshots() { .parse(); } + public StreamingOverwriteMode streamingOverwriteMode() { + String newModeValue = + confParser.stringConf().option(SparkReadOptions.STREAMING_OVERWRITE_MODE).parseOptional(); + + if (newModeValue != null) { + return StreamingOverwriteMode.fromName(newModeValue); + } + + if (streamingSkipOverwriteSnapshots()) { + LOG.warn( + "The option '{}' is deprecated and will be removed in a future release. " + + "Please use '{}=skip' instead.", + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + SparkReadOptions.STREAMING_OVERWRITE_MODE); + return StreamingOverwriteMode.SKIP; + } + + return StreamingOverwriteMode.fromName(SparkReadOptions.STREAMING_OVERWRITE_MODE_DEFAULT); + } + public boolean parquetVectorizationEnabled() { return confParser .booleanConf() diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index c31a7e55543d..38628a35174d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -69,11 +69,22 @@ private SparkReadOptions() {} public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; - // skip snapshots of type overwrite while reading stream out of iceberg table + /** + * Skip snapshots of type overwrite while reading stream out of iceberg table. + * + * @deprecated and will be removed in a future release; use {@link #STREAMING_OVERWRITE_MODE} with + * value "skip" instead. + */ + @Deprecated public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots"; + public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false; + // Configures how to handle overwrite snapshots during streaming reads + public static final String STREAMING_OVERWRITE_MODE = "streaming-overwrite-mode"; + public static final String STREAMING_OVERWRITE_MODE_DEFAULT = "fail"; + // Controls whether to allow reading timestamps without zone info @Deprecated public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java new file mode 100644 index 000000000000..cc8eb585e481 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Enumerates the modes for handling OVERWRITE snapshots during streaming reads. */ +public enum StreamingOverwriteMode { + /** Throw an error when an OVERWRITE snapshot is encountered (default behavior). */ + FAIL("fail"), + + /** Skip OVERWRITE snapshots entirely. */ + SKIP("skip"), + + /** Process only the added files from OVERWRITE snapshots. */ + ADDED_FILES_ONLY("added-files-only"); + + private final String modeName; + + StreamingOverwriteMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static StreamingOverwriteMode fromName(String modeName) { + Preconditions.checkArgument(modeName != null, "Mode name is null"); + + if (FAIL.modeName().equalsIgnoreCase(modeName)) { + return FAIL; + } else if (SKIP.modeName().equalsIgnoreCase(modeName)) { + return SKIP; + } else if (ADDED_FILES_ONLY.modeName().equalsIgnoreCase(modeName)) { + return ADDED_FILES_ONLY; + } else { + throw new IllegalArgumentException("Unknown streaming overwrite mode: " + modeName); + } + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4d8bb1772355..963297bc671b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.StreamingOverwriteMode; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -84,7 +85,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; - private final boolean skipOverwrite; + private final StreamingOverwriteMode overwriteMode; private final long fromTimestamp; private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; @@ -116,7 +117,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.initialOffset = initialOffsetStore.initialOffset(); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); - this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + this.overwriteMode = readConf.streamingOverwriteMode(); + + if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { + LOG.warn( + "Using '{}=added-files-only' mode with table '{}'. " + + "This mode processes only added files from OVERWRITE snapshots, which may result in " + + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " + + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + table.name()); + } } @Override @@ -275,12 +286,22 @@ private boolean shouldProcess(Snapshot snapshot) { SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); return false; case DataOperations.OVERWRITE: - Preconditions.checkState( - skipOverwrite, - "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); - return false; + switch (overwriteMode) { + case FAIL: + throw new IllegalStateException( + String.format( + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=skip, " + + "or to process added files only, set %s=added-files-only", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_OVERWRITE_MODE, + SparkReadOptions.STREAMING_OVERWRITE_MODE)); + case SKIP: + return false; + case ADDED_FILES_ONLY: + return true; + default: + throw new IllegalStateException("Unknown overwrite mode: " + overwriteMode); + } default: throw new IllegalStateException( String.format( diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java new file mode 100644 index 000000000000..fa5d01fda494 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestStreamingOverwriteMode { + + @Test + public void testFromNameFail() { + assertThat(StreamingOverwriteMode.fromName("fail")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("FAIL")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("Fail")).isEqualTo(StreamingOverwriteMode.FAIL); + } + + @Test + public void testFromNameSkip() { + assertThat(StreamingOverwriteMode.fromName("skip")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("SKIP")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("Skip")).isEqualTo(StreamingOverwriteMode.SKIP); + } + + @Test + public void testFromNameAddedFilesOnly() { + assertThat(StreamingOverwriteMode.fromName("added-files-only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("ADDED-FILES-ONLY")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("Added-Files-Only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + } + + @Test + public void testFromNameInvalid() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName("invalid")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown streaming overwrite mode: invalid"); + } + + @Test + public void testFromNameNull() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Mode name is null"); + } + + @Test + public void testModeName() { + assertThat(StreamingOverwriteMode.FAIL.modeName()).isEqualTo("fail"); + assertThat(StreamingOverwriteMode.SKIP.modeName()).isEqualTo("skip"); + assertThat(StreamingOverwriteMode.ADDED_FILES_ONLY.modeName()).isEqualTo("added-files-only"); + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index a1ccb69aef97..c3e1ddbc438e 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -832,6 +833,176 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + @TestTemplate + public void testReadStreamWithOverwriteModeSkip() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + // check pre-condition - that the above operation resulted in Snapshot of Type OVERWRITE. + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "skip"); + assertThat(rowsAvailable(query)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeFail() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // streaming-overwrite-mode=fail is the default, so we don't need to specify it + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "fail"); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeAddedFilesOnly() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + // create additional records that will be added via overwrite + List newRecordsInOverwrite = + Lists.newArrayList( + new SimpleRecord(100, "hundred"), new SimpleRecord(103, "hundred-three")); + + // write the new records to data file(s) + appendData(newRecordsInOverwrite); + table.refresh(); + + // get all data files to use in overwrite (Spark may create multiple files) + Snapshot latestAppend = table.currentSnapshot(); + + // do an overwrite that re-adds and deletes all files (same pattern as makeRewriteDataFiles) + OverwriteFiles overwrite = table.newOverwrite(); + for (DataFile datafile : latestAppend.addedDataFiles(table.io())) { + overwrite.addFile(datafile).deleteFile(datafile); + } + overwrite.commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = + startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "added-files-only"); + + // The stream should include the records from the OVERWRITE snapshot + List actual = rowsAvailable(query); + + // should contain all original data plus the records added in overwrite + List expected = Lists.newArrayList(Iterables.concat(dataAcrossSnapshots)); + expected.addAll(newRecordsInOverwrite); + // the overwrite adds all files again, so records appear twice + expected.addAll(newRecordsInOverwrite); + + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @TestTemplate + public void testOverwriteModeOptionTakesPrecedenceOverLegacyOption() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // new option should take precedence over legacy option + // legacy says skip=true, but new option says fail - should fail + StreamingQuery query = + startStream( + ImmutableMap.of( + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + "true", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + "fail")); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + /** * We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files * snapshot using existing files. diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 9ea08c316919..f833affacb62 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -29,6 +29,8 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class for common Iceberg configs for Spark reads. @@ -50,6 +52,8 @@ */ public class SparkReadConf { + private static final Logger LOG = LoggerFactory.getLogger(SparkReadConf.class); + private static final String DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize"; private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G"; private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 * 1024; // 256 MB @@ -143,6 +147,11 @@ public boolean streamingSkipDeleteSnapshots() { .parse(); } + /** + * @deprecated and will be removed in a future release; use {@link #streamingOverwriteMode()} + * instead. + */ + @Deprecated public boolean streamingSkipOverwriteSnapshots() { return confParser .booleanConf() @@ -151,6 +160,26 @@ public boolean streamingSkipOverwriteSnapshots() { .parse(); } + public StreamingOverwriteMode streamingOverwriteMode() { + String newModeValue = + confParser.stringConf().option(SparkReadOptions.STREAMING_OVERWRITE_MODE).parseOptional(); + + if (newModeValue != null) { + return StreamingOverwriteMode.fromName(newModeValue); + } + + if (streamingSkipOverwriteSnapshots()) { + LOG.warn( + "The option '{}' is deprecated and will be removed in a future release. " + + "Please use '{}=skip' instead.", + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + SparkReadOptions.STREAMING_OVERWRITE_MODE); + return StreamingOverwriteMode.SKIP; + } + + return StreamingOverwriteMode.fromName(SparkReadOptions.STREAMING_OVERWRITE_MODE_DEFAULT); + } + public boolean parquetVectorizationEnabled() { return confParser .booleanConf() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69b8..35a09b069bcc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -69,11 +69,22 @@ private SparkReadOptions() {} public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; - // skip snapshots of type overwrite while reading stream out of iceberg table + /** + * Skip snapshots of type overwrite while reading stream out of iceberg table. + * + * @deprecated and will be removed in a future release; use {@link #STREAMING_OVERWRITE_MODE} with + * value "skip" instead. + */ + @Deprecated public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots"; + public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false; + // Configures how to handle overwrite snapshots during streaming reads + public static final String STREAMING_OVERWRITE_MODE = "streaming-overwrite-mode"; + public static final String STREAMING_OVERWRITE_MODE_DEFAULT = "fail"; + // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "locality"; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java new file mode 100644 index 000000000000..cc8eb585e481 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Enumerates the modes for handling OVERWRITE snapshots during streaming reads. */ +public enum StreamingOverwriteMode { + /** Throw an error when an OVERWRITE snapshot is encountered (default behavior). */ + FAIL("fail"), + + /** Skip OVERWRITE snapshots entirely. */ + SKIP("skip"), + + /** Process only the added files from OVERWRITE snapshots. */ + ADDED_FILES_ONLY("added-files-only"); + + private final String modeName; + + StreamingOverwriteMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static StreamingOverwriteMode fromName(String modeName) { + Preconditions.checkArgument(modeName != null, "Mode name is null"); + + if (FAIL.modeName().equalsIgnoreCase(modeName)) { + return FAIL; + } else if (SKIP.modeName().equalsIgnoreCase(modeName)) { + return SKIP; + } else if (ADDED_FILES_ONLY.modeName().equalsIgnoreCase(modeName)) { + return ADDED_FILES_ONLY; + } else { + throw new IllegalArgumentException("Unknown streaming overwrite mode: " + modeName); + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4b79da813322..dfcda7451dbe 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.StreamingOverwriteMode; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -84,7 +85,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; - private final boolean skipOverwrite; + private final StreamingOverwriteMode overwriteMode; private final long fromTimestamp; private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; @@ -116,7 +117,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.initialOffset = initialOffsetStore.initialOffset(); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); - this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + this.overwriteMode = readConf.streamingOverwriteMode(); + + if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { + LOG.warn( + "Using '{}=added-files-only' mode with table '{}'. " + + "This mode processes only added files from OVERWRITE snapshots, which may result in " + + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " + + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + table.name()); + } } @Override @@ -275,12 +286,22 @@ private boolean shouldProcess(Snapshot snapshot) { SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); return false; case DataOperations.OVERWRITE: - Preconditions.checkState( - skipOverwrite, - "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); - return false; + switch (overwriteMode) { + case FAIL: + throw new IllegalStateException( + String.format( + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=skip, " + + "or to process added files only, set %s=added-files-only", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_OVERWRITE_MODE, + SparkReadOptions.STREAMING_OVERWRITE_MODE)); + case SKIP: + return false; + case ADDED_FILES_ONLY: + return true; + default: + throw new IllegalStateException("Unknown overwrite mode: " + overwriteMode); + } default: throw new IllegalStateException( String.format( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java new file mode 100644 index 000000000000..fa5d01fda494 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestStreamingOverwriteMode { + + @Test + public void testFromNameFail() { + assertThat(StreamingOverwriteMode.fromName("fail")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("FAIL")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("Fail")).isEqualTo(StreamingOverwriteMode.FAIL); + } + + @Test + public void testFromNameSkip() { + assertThat(StreamingOverwriteMode.fromName("skip")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("SKIP")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("Skip")).isEqualTo(StreamingOverwriteMode.SKIP); + } + + @Test + public void testFromNameAddedFilesOnly() { + assertThat(StreamingOverwriteMode.fromName("added-files-only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("ADDED-FILES-ONLY")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("Added-Files-Only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + } + + @Test + public void testFromNameInvalid() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName("invalid")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown streaming overwrite mode: invalid"); + } + + @Test + public void testFromNameNull() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Mode name is null"); + } + + @Test + public void testModeName() { + assertThat(StreamingOverwriteMode.FAIL.modeName()).isEqualTo("fail"); + assertThat(StreamingOverwriteMode.SKIP.modeName()).isEqualTo("skip"); + assertThat(StreamingOverwriteMode.ADDED_FILES_ONLY.modeName()).isEqualTo("added-files-only"); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 5a68b7a41488..f780e6beeaaa 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -820,6 +821,176 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + @TestTemplate + public void testReadStreamWithOverwriteModeSkip() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + // check pre-condition - that the above operation resulted in Snapshot of Type OVERWRITE. + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "skip"); + assertThat(rowsAvailable(query)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeFail() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // streaming-overwrite-mode=fail is the default, so we don't need to specify it + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "fail"); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeAddedFilesOnly() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + // create additional records that will be added via overwrite + List newRecordsInOverwrite = + Lists.newArrayList( + new SimpleRecord(100, "hundred"), new SimpleRecord(103, "hundred-three")); + + // write the new records to data file(s) + appendData(newRecordsInOverwrite); + table.refresh(); + + // get all data files to use in overwrite (Spark may create multiple files) + Snapshot latestAppend = table.currentSnapshot(); + + // do an overwrite that re-adds and deletes all files (same pattern as makeRewriteDataFiles) + OverwriteFiles overwrite = table.newOverwrite(); + for (DataFile datafile : latestAppend.addedDataFiles(table.io())) { + overwrite.addFile(datafile).deleteFile(datafile); + } + overwrite.commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = + startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "added-files-only"); + + // The stream should include the records from the OVERWRITE snapshot + List actual = rowsAvailable(query); + + // should contain all original data plus the records added in overwrite + List expected = Lists.newArrayList(Iterables.concat(dataAcrossSnapshots)); + expected.addAll(newRecordsInOverwrite); + // the overwrite adds all files again, so records appear twice + expected.addAll(newRecordsInOverwrite); + + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @TestTemplate + public void testOverwriteModeOptionTakesPrecedenceOverLegacyOption() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // new option should take precedence over legacy option + // legacy says skip=true, but new option says fail - should fail + StreamingQuery query = + startStream( + ImmutableMap.of( + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + "true", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + "fail")); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + /** * We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files * snapshot using existing files. diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2788e160d526..07d02e4753c2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -29,6 +29,8 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class for common Iceberg configs for Spark reads. @@ -50,6 +52,8 @@ */ public class SparkReadConf { + private static final Logger LOG = LoggerFactory.getLogger(SparkReadConf.class); + private static final String DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize"; private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G"; private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 * 1024; // 256 MB @@ -143,6 +147,11 @@ public boolean streamingSkipDeleteSnapshots() { .parse(); } + /** + * @deprecated and will be removed in a future release; use {@link #streamingOverwriteMode()} + * instead. + */ + @Deprecated public boolean streamingSkipOverwriteSnapshots() { return confParser .booleanConf() @@ -151,6 +160,26 @@ public boolean streamingSkipOverwriteSnapshots() { .parse(); } + public StreamingOverwriteMode streamingOverwriteMode() { + String newModeValue = + confParser.stringConf().option(SparkReadOptions.STREAMING_OVERWRITE_MODE).parseOptional(); + + if (newModeValue != null) { + return StreamingOverwriteMode.fromName(newModeValue); + } + + if (streamingSkipOverwriteSnapshots()) { + LOG.warn( + "The option '{}' is deprecated and will be removed in a future release. " + + "Please use '{}=skip' instead.", + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + SparkReadOptions.STREAMING_OVERWRITE_MODE); + return StreamingOverwriteMode.SKIP; + } + + return StreamingOverwriteMode.fromName(SparkReadOptions.STREAMING_OVERWRITE_MODE_DEFAULT); + } + public boolean parquetVectorizationEnabled() { return confParser .booleanConf() diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69b8..35a09b069bcc 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -69,11 +69,22 @@ private SparkReadOptions() {} public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; - // skip snapshots of type overwrite while reading stream out of iceberg table + /** + * Skip snapshots of type overwrite while reading stream out of iceberg table. + * + * @deprecated and will be removed in a future release; use {@link #STREAMING_OVERWRITE_MODE} with + * value "skip" instead. + */ + @Deprecated public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots"; + public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false; + // Configures how to handle overwrite snapshots during streaming reads + public static final String STREAMING_OVERWRITE_MODE = "streaming-overwrite-mode"; + public static final String STREAMING_OVERWRITE_MODE_DEFAULT = "fail"; + // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "locality"; diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java new file mode 100644 index 000000000000..cc8eb585e481 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Enumerates the modes for handling OVERWRITE snapshots during streaming reads. */ +public enum StreamingOverwriteMode { + /** Throw an error when an OVERWRITE snapshot is encountered (default behavior). */ + FAIL("fail"), + + /** Skip OVERWRITE snapshots entirely. */ + SKIP("skip"), + + /** Process only the added files from OVERWRITE snapshots. */ + ADDED_FILES_ONLY("added-files-only"); + + private final String modeName; + + StreamingOverwriteMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static StreamingOverwriteMode fromName(String modeName) { + Preconditions.checkArgument(modeName != null, "Mode name is null"); + + if (FAIL.modeName().equalsIgnoreCase(modeName)) { + return FAIL; + } else if (SKIP.modeName().equalsIgnoreCase(modeName)) { + return SKIP; + } else if (ADDED_FILES_ONLY.modeName().equalsIgnoreCase(modeName)) { + return ADDED_FILES_ONLY; + } else { + throw new IllegalArgumentException("Unknown streaming overwrite mode: " + modeName); + } + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 60dd1f318ca5..50fec411b609 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.StreamingOverwriteMode; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -84,7 +85,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; - private final boolean skipOverwrite; + private final StreamingOverwriteMode overwriteMode; private final long fromTimestamp; private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; @@ -116,7 +117,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.initialOffset = initialOffsetStore.initialOffset(); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); - this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + this.overwriteMode = readConf.streamingOverwriteMode(); + + if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { + LOG.warn( + "Using '{}=added-files-only' mode with table '{}'. " + + "This mode processes only added files from OVERWRITE snapshots, which may result in " + + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " + + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + table.name()); + } } @Override @@ -275,12 +286,22 @@ private boolean shouldProcess(Snapshot snapshot) { SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); return false; case DataOperations.OVERWRITE: - Preconditions.checkState( - skipOverwrite, - "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); - return false; + switch (overwriteMode) { + case FAIL: + throw new IllegalStateException( + String.format( + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=skip, " + + "or to process added files only, set %s=added-files-only", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_OVERWRITE_MODE, + SparkReadOptions.STREAMING_OVERWRITE_MODE)); + case SKIP: + return false; + case ADDED_FILES_ONLY: + return true; + default: + throw new IllegalStateException("Unknown overwrite mode: " + overwriteMode); + } default: throw new IllegalStateException( String.format( diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java new file mode 100644 index 000000000000..fa5d01fda494 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestStreamingOverwriteMode { + + @Test + public void testFromNameFail() { + assertThat(StreamingOverwriteMode.fromName("fail")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("FAIL")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("Fail")).isEqualTo(StreamingOverwriteMode.FAIL); + } + + @Test + public void testFromNameSkip() { + assertThat(StreamingOverwriteMode.fromName("skip")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("SKIP")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("Skip")).isEqualTo(StreamingOverwriteMode.SKIP); + } + + @Test + public void testFromNameAddedFilesOnly() { + assertThat(StreamingOverwriteMode.fromName("added-files-only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("ADDED-FILES-ONLY")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("Added-Files-Only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + } + + @Test + public void testFromNameInvalid() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName("invalid")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown streaming overwrite mode: invalid"); + } + + @Test + public void testFromNameNull() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Mode name is null"); + } + + @Test + public void testModeName() { + assertThat(StreamingOverwriteMode.FAIL.modeName()).isEqualTo("fail"); + assertThat(StreamingOverwriteMode.SKIP.modeName()).isEqualTo("skip"); + assertThat(StreamingOverwriteMode.ADDED_FILES_ONLY.modeName()).isEqualTo("added-files-only"); + } +} diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 1a726c365ace..62455b2de5e8 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -833,6 +834,176 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + @TestTemplate + public void testReadStreamWithOverwriteModeSkip() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + // check pre-condition - that the above operation resulted in Snapshot of Type OVERWRITE. + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "skip"); + assertThat(rowsAvailable(query)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeFail() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // streaming-overwrite-mode=fail is the default, so we don't need to specify it + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "fail"); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeAddedFilesOnly() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + // create additional records that will be added via overwrite + List newRecordsInOverwrite = + Lists.newArrayList( + new SimpleRecord(100, "hundred"), new SimpleRecord(103, "hundred-three")); + + // write the new records to data file(s) + appendData(newRecordsInOverwrite); + table.refresh(); + + // get all data files to use in overwrite (Spark may create multiple files) + Snapshot latestAppend = table.currentSnapshot(); + + // do an overwrite that re-adds and deletes all files (same pattern as makeRewriteDataFiles) + OverwriteFiles overwrite = table.newOverwrite(); + for (DataFile datafile : latestAppend.addedDataFiles(table.io())) { + overwrite.addFile(datafile).deleteFile(datafile); + } + overwrite.commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = + startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "added-files-only"); + + // The stream should include the records from the OVERWRITE snapshot + List actual = rowsAvailable(query); + + // should contain all original data plus the records added in overwrite + List expected = Lists.newArrayList(Iterables.concat(dataAcrossSnapshots)); + expected.addAll(newRecordsInOverwrite); + // the overwrite adds all files again, so records appear twice + expected.addAll(newRecordsInOverwrite); + + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @TestTemplate + public void testOverwriteModeOptionTakesPrecedenceOverLegacyOption() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // new option should take precedence over legacy option + // legacy says skip=true, but new option says fail - should fail + StreamingQuery query = + startStream( + ImmutableMap.of( + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + "true", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + "fail")); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + /** * We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files * snapshot using existing files. diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2788e160d526..07d02e4753c2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -29,6 +29,8 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class for common Iceberg configs for Spark reads. @@ -50,6 +52,8 @@ */ public class SparkReadConf { + private static final Logger LOG = LoggerFactory.getLogger(SparkReadConf.class); + private static final String DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize"; private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G"; private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 * 1024; // 256 MB @@ -143,6 +147,11 @@ public boolean streamingSkipDeleteSnapshots() { .parse(); } + /** + * @deprecated and will be removed in a future release; use {@link #streamingOverwriteMode()} + * instead. + */ + @Deprecated public boolean streamingSkipOverwriteSnapshots() { return confParser .booleanConf() @@ -151,6 +160,26 @@ public boolean streamingSkipOverwriteSnapshots() { .parse(); } + public StreamingOverwriteMode streamingOverwriteMode() { + String newModeValue = + confParser.stringConf().option(SparkReadOptions.STREAMING_OVERWRITE_MODE).parseOptional(); + + if (newModeValue != null) { + return StreamingOverwriteMode.fromName(newModeValue); + } + + if (streamingSkipOverwriteSnapshots()) { + LOG.warn( + "The option '{}' is deprecated and will be removed in a future release. " + + "Please use '{}=skip' instead.", + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + SparkReadOptions.STREAMING_OVERWRITE_MODE); + return StreamingOverwriteMode.SKIP; + } + + return StreamingOverwriteMode.fromName(SparkReadOptions.STREAMING_OVERWRITE_MODE_DEFAULT); + } + public boolean parquetVectorizationEnabled() { return confParser .booleanConf() diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69b8..35a09b069bcc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -69,11 +69,22 @@ private SparkReadOptions() {} public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; - // skip snapshots of type overwrite while reading stream out of iceberg table + /** + * Skip snapshots of type overwrite while reading stream out of iceberg table. + * + * @deprecated and will be removed in a future release; use {@link #STREAMING_OVERWRITE_MODE} with + * value "skip" instead. + */ + @Deprecated public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots"; + public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false; + // Configures how to handle overwrite snapshots during streaming reads + public static final String STREAMING_OVERWRITE_MODE = "streaming-overwrite-mode"; + public static final String STREAMING_OVERWRITE_MODE_DEFAULT = "fail"; + // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "locality"; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java new file mode 100644 index 000000000000..cc8eb585e481 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/StreamingOverwriteMode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Enumerates the modes for handling OVERWRITE snapshots during streaming reads. */ +public enum StreamingOverwriteMode { + /** Throw an error when an OVERWRITE snapshot is encountered (default behavior). */ + FAIL("fail"), + + /** Skip OVERWRITE snapshots entirely. */ + SKIP("skip"), + + /** Process only the added files from OVERWRITE snapshots. */ + ADDED_FILES_ONLY("added-files-only"); + + private final String modeName; + + StreamingOverwriteMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static StreamingOverwriteMode fromName(String modeName) { + Preconditions.checkArgument(modeName != null, "Mode name is null"); + + if (FAIL.modeName().equalsIgnoreCase(modeName)) { + return FAIL; + } else if (SKIP.modeName().equalsIgnoreCase(modeName)) { + return SKIP; + } else if (ADDED_FILES_ONLY.modeName().equalsIgnoreCase(modeName)) { + return ADDED_FILES_ONLY; + } else { + throw new IllegalArgumentException("Unknown streaming overwrite mode: " + modeName); + } + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 60dd1f318ca5..50fec411b609 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.StreamingOverwriteMode; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -84,7 +85,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; - private final boolean skipOverwrite; + private final StreamingOverwriteMode overwriteMode; private final long fromTimestamp; private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; @@ -116,7 +117,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.initialOffset = initialOffsetStore.initialOffset(); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); - this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); + this.overwriteMode = readConf.streamingOverwriteMode(); + + if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { + LOG.warn( + "Using '{}=added-files-only' mode with table '{}'. " + + "This mode processes only added files from OVERWRITE snapshots, which may result in " + + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " + + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + table.name()); + } } @Override @@ -275,12 +286,22 @@ private boolean shouldProcess(Snapshot snapshot) { SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); return false; case DataOperations.OVERWRITE: - Preconditions.checkState( - skipOverwrite, - "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); - return false; + switch (overwriteMode) { + case FAIL: + throw new IllegalStateException( + String.format( + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=skip, " + + "or to process added files only, set %s=added-files-only", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_OVERWRITE_MODE, + SparkReadOptions.STREAMING_OVERWRITE_MODE)); + case SKIP: + return false; + case ADDED_FILES_ONLY: + return true; + default: + throw new IllegalStateException("Unknown overwrite mode: " + overwriteMode); + } default: throw new IllegalStateException( String.format( diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java new file mode 100644 index 000000000000..fa5d01fda494 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestStreamingOverwriteMode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestStreamingOverwriteMode { + + @Test + public void testFromNameFail() { + assertThat(StreamingOverwriteMode.fromName("fail")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("FAIL")).isEqualTo(StreamingOverwriteMode.FAIL); + assertThat(StreamingOverwriteMode.fromName("Fail")).isEqualTo(StreamingOverwriteMode.FAIL); + } + + @Test + public void testFromNameSkip() { + assertThat(StreamingOverwriteMode.fromName("skip")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("SKIP")).isEqualTo(StreamingOverwriteMode.SKIP); + assertThat(StreamingOverwriteMode.fromName("Skip")).isEqualTo(StreamingOverwriteMode.SKIP); + } + + @Test + public void testFromNameAddedFilesOnly() { + assertThat(StreamingOverwriteMode.fromName("added-files-only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("ADDED-FILES-ONLY")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + assertThat(StreamingOverwriteMode.fromName("Added-Files-Only")) + .isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY); + } + + @Test + public void testFromNameInvalid() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName("invalid")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown streaming overwrite mode: invalid"); + } + + @Test + public void testFromNameNull() { + assertThatThrownBy(() -> StreamingOverwriteMode.fromName(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Mode name is null"); + } + + @Test + public void testModeName() { + assertThat(StreamingOverwriteMode.FAIL.modeName()).isEqualTo("fail"); + assertThat(StreamingOverwriteMode.SKIP.modeName()).isEqualTo("skip"); + assertThat(StreamingOverwriteMode.ADDED_FILES_ONLY.modeName()).isEqualTo("added-files-only"); + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 1a726c365ace..62455b2de5e8 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -833,6 +834,176 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + @TestTemplate + public void testReadStreamWithOverwriteModeSkip() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + // check pre-condition - that the above operation resulted in Snapshot of Type OVERWRITE. + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "skip"); + assertThat(rowsAvailable(query)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeFail() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // streaming-overwrite-mode=fail is the default, so we don't need to specify it + StreamingQuery query = startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "fail"); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + + @TestTemplate + public void testReadStreamWithOverwriteModeAddedFilesOnly() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + // create additional records that will be added via overwrite + List newRecordsInOverwrite = + Lists.newArrayList( + new SimpleRecord(100, "hundred"), new SimpleRecord(103, "hundred-three")); + + // write the new records to data file(s) + appendData(newRecordsInOverwrite); + table.refresh(); + + // get all data files to use in overwrite (Spark may create multiple files) + Snapshot latestAppend = table.currentSnapshot(); + + // do an overwrite that re-adds and deletes all files (same pattern as makeRewriteDataFiles) + OverwriteFiles overwrite = table.newOverwrite(); + for (DataFile datafile : latestAppend.addedDataFiles(table.io())) { + overwrite.addFile(datafile).deleteFile(datafile); + } + overwrite.commit(); + + // check pre-condition - that the above operation resulted in snapshot of type OVERWRITE + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + StreamingQuery query = + startStream(SparkReadOptions.STREAMING_OVERWRITE_MODE, "added-files-only"); + + // The stream should include the records from the OVERWRITE snapshot + List actual = rowsAvailable(query); + + // should contain all original data plus the records added in overwrite + List expected = Lists.newArrayList(Iterables.concat(dataAcrossSnapshots)); + expected.addAll(newRecordsInOverwrite); + // the overwrite adds all files again, so records appear twice + expected.addAll(newRecordsInOverwrite); + + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @TestTemplate + public void testOverwriteModeOptionTakesPrecedenceOverLegacyOption() throws Exception { + table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + + // this should create a snapshot with type overwrite. + table + .newOverwrite() + .addFile(dataFile) + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); + + // new option should take precedence over legacy option + // legacy says skip=true, but new option says fail - should fail + StreamingQuery query = + startStream( + ImmutableMap.of( + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, + "true", + SparkReadOptions.STREAMING_OVERWRITE_MODE, + "fail")); + + assertThatThrownBy(query::processAllAvailable) + .cause() + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot process overwrite snapshot"); + } + /** * We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files * snapshot using existing files. From 20c4c9c6c27f3150b89d43af089ad90da34b00d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Martins?= Date: Wed, 21 Jan 2026 13:10:47 +0000 Subject: [PATCH 2/3] Spark: Add streaming-overwrite-mode option for handling OVERWRITE snapshots --- .../apache/iceberg/spark/source/SparkMicroBatchStream.java | 6 +++--- .../apache/iceberg/spark/source/SparkMicroBatchStream.java | 6 +++--- .../apache/iceberg/spark/source/SparkMicroBatchStream.java | 6 +++--- .../apache/iceberg/spark/source/SparkMicroBatchStream.java | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 963297bc671b..a7d5d6cd4220 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -122,9 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { LOG.warn( "Using '{}=added-files-only' mode with table '{}'. " - + "This mode processes only added files from OVERWRITE snapshots, which may result in " - + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " - + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + + "This may produce duplicate records when overwrites rewrite existing data " + + "(e.g., MERGE, UPDATE, DELETE). " + + "Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).", SparkReadOptions.STREAMING_OVERWRITE_MODE, table.name()); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index dfcda7451dbe..9e548d7266a7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -122,9 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { LOG.warn( "Using '{}=added-files-only' mode with table '{}'. " - + "This mode processes only added files from OVERWRITE snapshots, which may result in " - + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " - + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + + "This may produce duplicate records when overwrites rewrite existing data " + + "(e.g., MERGE, UPDATE, DELETE). " + + "Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).", SparkReadOptions.STREAMING_OVERWRITE_MODE, table.name()); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 50fec411b609..8f3e311d3231 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -122,9 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { LOG.warn( "Using '{}=added-files-only' mode with table '{}'. " - + "This mode processes only added files from OVERWRITE snapshots, which may result in " - + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " - + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + + "This may produce duplicate records when overwrites rewrite existing data " + + "(e.g., MERGE, UPDATE, DELETE). " + + "Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).", SparkReadOptions.STREAMING_OVERWRITE_MODE, table.name()); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 50fec411b609..8f3e311d3231 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -122,9 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) { LOG.warn( "Using '{}=added-files-only' mode with table '{}'. " - + "This mode processes only added files from OVERWRITE snapshots, which may result in " - + "duplicate records if the overwrite rewrote existing data (e.g., MERGE, UPDATE, DELETE). " - + "This is safe when overwrites contain only new data (e.g., INSERT OVERWRITE to new partitions).", + + "This may produce duplicate records when overwrites rewrite existing data " + + "(e.g., MERGE, UPDATE, DELETE). " + + "Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).", SparkReadOptions.STREAMING_OVERWRITE_MODE, table.name()); } From b61f71bb21700ae675a43ceb4d79513f850f277c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Martins?= Date: Fri, 23 Jan 2026 15:25:12 +0000 Subject: [PATCH 3/3] update spark-configuration.md --- docs/docs/spark-configuration.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index 6dee8244854e..53cf4be268bb 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -220,6 +220,8 @@ spark.read | stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used | | streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch | | streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit | +| streaming-overwrite-mode | fail | How to handle overwrite snapshots in streaming reads: `fail` (throw exception), `skip` (ignore overwrite snapshots), `added-files-only` (process only added files, may produce duplicates) | +| streaming-skip-delete-snapshots | false | When true, ignores delete snapshots in streaming reads | ### Write options