Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ spark.read
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
| streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch |
| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit |
| streaming-overwrite-mode | fail | How to handle overwrite snapshots in streaming reads: `fail` (throw exception), `skip` (ignore overwrite snapshots), `added-files-only` (process only added files, may produce duplicates) |
| streaming-skip-delete-snapshots | false | When true, ignores delete snapshots in streaming reads |

### Write options

Expand Down
10 changes: 9 additions & 1 deletion docs/docs/spark-structured-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ val df = spark.readStream
```

!!! warning
Iceberg only supports reading data from append snapshots. Overwrite snapshots cannot be processed and will cause an exception by default. Overwrites may be ignored by setting `streaming-skip-overwrite-snapshots=true`. Similarly, delete snapshots will cause an exception by default, and deletes may be ignored by setting `streaming-skip-delete-snapshots=true`.
Iceberg only supports reading data from append snapshots by default. Overwrite snapshots can be handled using the `streaming-overwrite-mode` option:

* `fail` (default): Throws an exception when an overwrite snapshot is encountered
* `skip`: Ignores overwrite snapshots entirely
* `added-files-only`: Processes only the added files from overwrite snapshots. **Warning:** This may produce duplicate records when overwrites rewrite existing data (e.g., MERGE, UPDATE, DELETE). Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).

The deprecated `streaming-skip-overwrite-snapshots=true` option is equivalent to `streaming-overwrite-mode=skip`.

Similarly, delete snapshots will cause an exception by default, and deletes may be ignored by setting `streaming-skip-delete-snapshots=true`.

### Limit input rate
To control the size of micro-batches in the DataFrame API, Iceberg supports two read options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class for common Iceberg configs for Spark reads.
Expand All @@ -50,6 +52,8 @@
*/
public class SparkReadConf {

private static final Logger LOG = LoggerFactory.getLogger(SparkReadConf.class);

private static final String DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize";
private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G";
private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 * 1024; // 256 MB
Expand Down Expand Up @@ -147,6 +151,11 @@ public boolean streamingSkipDeleteSnapshots() {
.parse();
}

/**
* @deprecated and will be removed in a future release; use {@link #streamingOverwriteMode()}
* instead.
*/
@Deprecated
public boolean streamingSkipOverwriteSnapshots() {
return confParser
.booleanConf()
Expand All @@ -155,6 +164,26 @@ public boolean streamingSkipOverwriteSnapshots() {
.parse();
}

public StreamingOverwriteMode streamingOverwriteMode() {
String newModeValue =
confParser.stringConf().option(SparkReadOptions.STREAMING_OVERWRITE_MODE).parseOptional();

if (newModeValue != null) {
return StreamingOverwriteMode.fromName(newModeValue);
}

if (streamingSkipOverwriteSnapshots()) {
LOG.warn(
"The option '{}' is deprecated and will be removed in a future release. "
+ "Please use '{}=skip' instead.",
SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS,
SparkReadOptions.STREAMING_OVERWRITE_MODE);
return StreamingOverwriteMode.SKIP;
}

return StreamingOverwriteMode.fromName(SparkReadOptions.STREAMING_OVERWRITE_MODE_DEFAULT);
}

public boolean parquetVectorizationEnabled() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ private SparkReadOptions() {}
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots";
public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;

// skip snapshots of type overwrite while reading stream out of iceberg table
/**
* Skip snapshots of type overwrite while reading stream out of iceberg table.
*
* @deprecated and will be removed in a future release; use {@link #STREAMING_OVERWRITE_MODE} with
* value "skip" instead.
*/
@Deprecated
public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS =
"streaming-skip-overwrite-snapshots";

public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false;

// Configures how to handle overwrite snapshots during streaming reads
public static final String STREAMING_OVERWRITE_MODE = "streaming-overwrite-mode";
public static final String STREAMING_OVERWRITE_MODE_DEFAULT = "fail";

// Controls whether to allow reading timestamps without zone info
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Enumerates the modes for handling OVERWRITE snapshots during streaming reads. */
public enum StreamingOverwriteMode {
/** Throw an error when an OVERWRITE snapshot is encountered (default behavior). */
FAIL("fail"),

/** Skip OVERWRITE snapshots entirely. */
SKIP("skip"),

/** Process only the added files from OVERWRITE snapshots. */
ADDED_FILES_ONLY("added-files-only");

private final String modeName;

StreamingOverwriteMode(String modeName) {
this.modeName = modeName;
}

public String modeName() {
return modeName;
}

public static StreamingOverwriteMode fromName(String modeName) {
Preconditions.checkArgument(modeName != null, "Mode name is null");

if (FAIL.modeName().equalsIgnoreCase(modeName)) {
return FAIL;
} else if (SKIP.modeName().equalsIgnoreCase(modeName)) {
return SKIP;
} else if (ADDED_FILES_ONLY.modeName().equalsIgnoreCase(modeName)) {
return ADDED_FILES_ONLY;
} else {
throw new IllegalArgumentException("Unknown streaming overwrite mode: " + modeName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.StreamingOverwriteMode;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -84,7 +85,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final StreamingOverwriteMode overwriteMode;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
Expand Down Expand Up @@ -116,7 +117,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
this.initialOffset = initialOffsetStore.initialOffset();

this.skipDelete = readConf.streamingSkipDeleteSnapshots();
this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
this.overwriteMode = readConf.streamingOverwriteMode();

if (overwriteMode == StreamingOverwriteMode.ADDED_FILES_ONLY) {
LOG.warn(
"Using '{}=added-files-only' mode with table '{}'. "
+ "This may produce duplicate records when overwrites rewrite existing data "
+ "(e.g., MERGE, UPDATE, DELETE). "
+ "Downstream processing must handle duplicates (e.g., idempotent writes, deduplication).",
SparkReadOptions.STREAMING_OVERWRITE_MODE,
table.name());
}
}

@Override
Expand Down Expand Up @@ -275,12 +286,22 @@ private boolean shouldProcess(Snapshot snapshot) {
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
return false;
case DataOperations.OVERWRITE:
Preconditions.checkState(
skipOverwrite,
"Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true",
snapshot.snapshotId(),
SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
return false;
switch (overwriteMode) {
case FAIL:
throw new IllegalStateException(
String.format(
"Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=skip, "
+ "or to process added files only, set %s=added-files-only",
snapshot.snapshotId(),
SparkReadOptions.STREAMING_OVERWRITE_MODE,
SparkReadOptions.STREAMING_OVERWRITE_MODE));
case SKIP:
return false;
case ADDED_FILES_ONLY:
return true;
default:
throw new IllegalStateException("Unknown overwrite mode: " + overwriteMode);
}
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.junit.jupiter.api.Test;

public class TestStreamingOverwriteMode {

@Test
public void testFromNameFail() {
assertThat(StreamingOverwriteMode.fromName("fail")).isEqualTo(StreamingOverwriteMode.FAIL);
assertThat(StreamingOverwriteMode.fromName("FAIL")).isEqualTo(StreamingOverwriteMode.FAIL);
assertThat(StreamingOverwriteMode.fromName("Fail")).isEqualTo(StreamingOverwriteMode.FAIL);
}

@Test
public void testFromNameSkip() {
assertThat(StreamingOverwriteMode.fromName("skip")).isEqualTo(StreamingOverwriteMode.SKIP);
assertThat(StreamingOverwriteMode.fromName("SKIP")).isEqualTo(StreamingOverwriteMode.SKIP);
assertThat(StreamingOverwriteMode.fromName("Skip")).isEqualTo(StreamingOverwriteMode.SKIP);
}

@Test
public void testFromNameAddedFilesOnly() {
assertThat(StreamingOverwriteMode.fromName("added-files-only"))
.isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY);
assertThat(StreamingOverwriteMode.fromName("ADDED-FILES-ONLY"))
.isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY);
assertThat(StreamingOverwriteMode.fromName("Added-Files-Only"))
.isEqualTo(StreamingOverwriteMode.ADDED_FILES_ONLY);
}

@Test
public void testFromNameInvalid() {
assertThatThrownBy(() -> StreamingOverwriteMode.fromName("invalid"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Unknown streaming overwrite mode: invalid");
}

@Test
public void testFromNameNull() {
assertThatThrownBy(() -> StreamingOverwriteMode.fromName(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Mode name is null");
}

@Test
public void testModeName() {
assertThat(StreamingOverwriteMode.FAIL.modeName()).isEqualTo("fail");
assertThat(StreamingOverwriteMode.SKIP.modeName()).isEqualTo("skip");
assertThat(StreamingOverwriteMode.ADDED_FILES_ONLY.modeName()).isEqualTo("added-files-only");
}
}
Loading