Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ publish_changes creates a new snapshot from an existing snapshot without alterin

Only append and dynamic overwrite snapshots can be successfully published.

The `publish_changes` procedure will fail if there are multiple snapshots in the table with the provided `wap_id`.

!!! info
This procedure invalidates all cached Spark plans that reference the affected table.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ public void testApplyInvalidWapId() {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}

@TestTemplate
public void testApplyDuplicateWapId() {

String wapId = "wap_id_1";

sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED);

spark.conf().set("spark.wap.id", wapId);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

assertThatThrownBy(
() -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId))
.isInstanceOf(ValidationException.class)
.hasMessage(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'");
}

@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.apache.iceberg.spark.procedures;

import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -92,21 +90,26 @@ public InternalRow[] call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
Optional<Snapshot> wapSnapshot =
Optional.ofNullable(
Iterables.find(
table.snapshots(),
snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
null));
if (!wapSnapshot.isPresent()) {
Snapshot matchingSnap = null;
for (Snapshot snap : table.snapshots()) {
if (wapId.equals(WapUtil.stagedWapId(snap))) {
if (matchingSnap != null) {
throw new ValidationException(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'",
wapId);
} else {
matchingSnap = snap;
}
}
}

if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId);
}

long wapSnapshotId = wapSnapshot.get().snapshotId();
long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();

Snapshot currentSnapshot = table.currentSnapshot();

InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId());
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ public void testApplyInvalidWapId() {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}

@TestTemplate
public void testApplyDuplicateWapId() {

String wapId = "wap_id_1";

sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED);

spark.conf().set("spark.wap.id", wapId);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

assertThatThrownBy(
() -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId))
.isInstanceOf(ValidationException.class)
.hasMessage(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'");
}

@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.apache.iceberg.spark.procedures;

import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -92,21 +90,26 @@ public InternalRow[] call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
Optional<Snapshot> wapSnapshot =
Optional.ofNullable(
Iterables.find(
table.snapshots(),
snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
null));
if (!wapSnapshot.isPresent()) {
Snapshot matchingSnap = null;
for (Snapshot snap : table.snapshots()) {
if (wapId.equals(WapUtil.stagedWapId(snap))) {
if (matchingSnap != null) {
throw new ValidationException(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'",
wapId);
} else {
matchingSnap = snap;
}
}
}

if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId);
}

long wapSnapshotId = wapSnapshot.get().snapshotId();
long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();

Snapshot currentSnapshot = table.currentSnapshot();

InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId());
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ public void testApplyInvalidWapId() {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}

@TestTemplate
public void testApplyDuplicateWapId() {

String wapId = "wap_id_1";

sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED);

spark.conf().set("spark.wap.id", wapId);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

assertThatThrownBy(
() -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId))
.isInstanceOf(ValidationException.class)
.hasMessage(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'");
}

@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.iceberg.spark.procedures;

import java.util.Iterator;
import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -97,21 +95,26 @@ public Iterator<Scan> call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
Optional<Snapshot> wapSnapshot =
Optional.ofNullable(
Iterables.find(
table.snapshots(),
snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
null));
if (!wapSnapshot.isPresent()) {
Snapshot matchingSnap = null;
for (Snapshot snap : table.snapshots()) {
if (wapId.equals(WapUtil.stagedWapId(snap))) {
if (matchingSnap != null) {
throw new ValidationException(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'",
wapId);
} else {
matchingSnap = snap;
}
}
}

if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId);
}

long wapSnapshotId = wapSnapshot.get().snapshotId();
long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();

Snapshot currentSnapshot = table.currentSnapshot();

InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId());
return asScanIterator(OUTPUT_TYPE, outputRow);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ public void testApplyInvalidWapId() {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}

@TestTemplate
public void testApplyDuplicateWapId() {

String wapId = "wap_id_1";

sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED);

spark.conf().set("spark.wap.id", wapId);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

assertThatThrownBy(
() -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId))
.isInstanceOf(ValidationException.class)
.hasMessage(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID 'wap_id_1'");
}

@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.iceberg.spark.procedures;

import java.util.Iterator;
import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -97,21 +95,26 @@ public Iterator<Scan> call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
Optional<Snapshot> wapSnapshot =
Optional.ofNullable(
Iterables.find(
table.snapshots(),
snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
null));
if (!wapSnapshot.isPresent()) {
Snapshot matchingSnap = null;
for (Snapshot snap : table.snapshots()) {
if (wapId.equals(WapUtil.stagedWapId(snap))) {
if (matchingSnap != null) {
throw new ValidationException(
"Cannot apply non-unique WAP ID. Found multiple snapshots with WAP ID '%s'",
wapId);
} else {
matchingSnap = snap;
}
}
}

if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId);
}

long wapSnapshotId = wapSnapshot.get().snapshotId();
long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();

Snapshot currentSnapshot = table.currentSnapshot();

InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId());
return asScanIterator(OUTPUT_TYPE, outputRow);
});
Expand Down