diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 7f211d9f260b..e29b00e5f81a 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -189,6 +189,8 @@ publish_changes creates a new snapshot from an existing snapshot without alterin Only append and dynamic overwrite snapshots can be successfully published. +The `publish_changes` procedure will fail if there are multiple snapshots in the table with the provided `wap_id`. + !!! info This procedure invalidates all cached Spark plans that reference the affected table. diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 08f44c8f01f2..d9319801d154 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -161,6 +161,26 @@ public void testApplyInvalidWapId() { .hasMessage("Cannot apply unknown WAP ID 'not_valid'"); } + @TestTemplate + public void testApplyDuplicateWapId() { + + String wapId = "wap_id_1"; + + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'"); + } + @TestTemplate public void testInvalidApplyWapChangesCases() { assertThatThrownBy( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 2c3ce7418e08..a47e75415336 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -18,10 +18,8 @@ */ package org.apache.iceberg.spark.procedures; -import java.util.Optional; import org.apache.iceberg.Snapshot; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.util.WapUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -92,21 +90,26 @@ public InternalRow[] call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - Optional wapSnapshot = - Optional.ofNullable( - Iterables.find( - table.snapshots(), - snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), - null)); - if (!wapSnapshot.isPresent()) { + Snapshot matchingSnap = null; + for (Snapshot snap : table.snapshots()) { + if (wapId.equals(WapUtil.stagedWapId(snap))) { + if (matchingSnap != null) { + throw new ValidationException( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'", + wapId); + } else { + matchingSnap = snap; + } + } + } + + if (matchingSnap == null) { throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); } - long wapSnapshotId = wapSnapshot.get().snapshotId(); + long wapSnapshotId = matchingSnap.snapshotId(); table.manageSnapshots().cherrypick(wapSnapshotId).commit(); - Snapshot currentSnapshot = table.currentSnapshot(); - InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); return new InternalRow[] {outputRow}; }); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 08f44c8f01f2..d9319801d154 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -161,6 +161,26 @@ public void testApplyInvalidWapId() { .hasMessage("Cannot apply unknown WAP ID 'not_valid'"); } + @TestTemplate + public void testApplyDuplicateWapId() { + + String wapId = "wap_id_1"; + + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'"); + } + @TestTemplate public void testInvalidApplyWapChangesCases() { assertThatThrownBy( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 2c3ce7418e08..a47e75415336 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -18,10 +18,8 @@ */ package org.apache.iceberg.spark.procedures; -import java.util.Optional; import org.apache.iceberg.Snapshot; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.util.WapUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -92,21 +90,26 @@ public InternalRow[] call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - Optional wapSnapshot = - Optional.ofNullable( - Iterables.find( - table.snapshots(), - snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), - null)); - if (!wapSnapshot.isPresent()) { + Snapshot matchingSnap = null; + for (Snapshot snap : table.snapshots()) { + if (wapId.equals(WapUtil.stagedWapId(snap))) { + if (matchingSnap != null) { + throw new ValidationException( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'", + wapId); + } else { + matchingSnap = snap; + } + } + } + + if (matchingSnap == null) { throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); } - long wapSnapshotId = wapSnapshot.get().snapshotId(); + long wapSnapshotId = matchingSnap.snapshotId(); table.manageSnapshots().cherrypick(wapSnapshotId).commit(); - Snapshot currentSnapshot = table.currentSnapshot(); - InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); return new InternalRow[] {outputRow}; }); diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 4958fde15d55..c72770e1cec6 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -159,6 +159,26 @@ public void testApplyInvalidWapId() { .hasMessage("Cannot apply unknown WAP ID 'not_valid'"); } + @TestTemplate + public void testApplyDuplicateWapId() { + + String wapId = "wap_id_1"; + + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'"); + } + @TestTemplate public void testInvalidApplyWapChangesCases() { assertThatThrownBy( diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 874888204334..8cb0a2bfb759 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -19,10 +19,8 @@ package org.apache.iceberg.spark.procedures; import java.util.Iterator; -import java.util.Optional; import org.apache.iceberg.Snapshot; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.util.WapUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -97,21 +95,26 @@ public Iterator call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - Optional wapSnapshot = - Optional.ofNullable( - Iterables.find( - table.snapshots(), - snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), - null)); - if (!wapSnapshot.isPresent()) { + Snapshot matchingSnap = null; + for (Snapshot snap : table.snapshots()) { + if (wapId.equals(WapUtil.stagedWapId(snap))) { + if (matchingSnap != null) { + throw new ValidationException( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'", + wapId); + } else { + matchingSnap = snap; + } + } + } + + if (matchingSnap == null) { throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); } - long wapSnapshotId = wapSnapshot.get().snapshotId(); + long wapSnapshotId = matchingSnap.snapshotId(); table.manageSnapshots().cherrypick(wapSnapshotId).commit(); - Snapshot currentSnapshot = table.currentSnapshot(); - InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); return asScanIterator(OUTPUT_TYPE, outputRow); }); diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 4958fde15d55..c72770e1cec6 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -159,6 +159,26 @@ public void testApplyInvalidWapId() { .hasMessage("Cannot apply unknown WAP ID 'not_valid'"); } + @TestTemplate + public void testApplyDuplicateWapId() { + + String wapId = "wap_id_1"; + + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'"); + } + @TestTemplate public void testInvalidApplyWapChangesCases() { assertThatThrownBy( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 874888204334..8cb0a2bfb759 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -19,10 +19,8 @@ package org.apache.iceberg.spark.procedures; import java.util.Iterator; -import java.util.Optional; import org.apache.iceberg.Snapshot; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.util.WapUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -97,21 +95,26 @@ public Iterator call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - Optional wapSnapshot = - Optional.ofNullable( - Iterables.find( - table.snapshots(), - snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), - null)); - if (!wapSnapshot.isPresent()) { + Snapshot matchingSnap = null; + for (Snapshot snap : table.snapshots()) { + if (wapId.equals(WapUtil.stagedWapId(snap))) { + if (matchingSnap != null) { + throw new ValidationException( + "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'", + wapId); + } else { + matchingSnap = snap; + } + } + } + + if (matchingSnap == null) { throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); } - long wapSnapshotId = wapSnapshot.get().snapshotId(); + long wapSnapshotId = matchingSnap.snapshotId(); table.manageSnapshots().cherrypick(wapSnapshotId).commit(); - Snapshot currentSnapshot = table.currentSnapshot(); - InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); return asScanIterator(OUTPUT_TYPE, outputRow); });