From e5db0117517f70573366d619140607a3c8f5af2a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Jun 2025 15:22:12 -0400 Subject: [PATCH 1/3] ITs for RESTCatalog using BLMS --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 1 + .../iceberg/catalog/IcebergCatalogBaseIT.java | 28 ++++--- .../io/iceberg/catalog/RESTCatalogBLMSIT.java | 76 +++++++++++++++++++ 4 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 2 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index e4e7e2f1095b..437c4bb2d04e 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -95,6 +95,7 @@ dependencies { testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.google_api_services_bigquery + testImplementation library.java.google_auth_library_oauth2_http testRuntimeOnly library.java.slf4j_jdk14 testImplementation project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java") diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index b0d0d159b096..e281f7bae2fe 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; import static org.apache.beam.sdk.managed.Managed.ICEBERG; import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; @@ -312,7 +314,7 @@ public Row apply(Long num) { }; protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA = - IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + beamSchemaToIcebergSchema(BEAM_SCHEMA); protected static final SimpleFunction RECORD_FUNC = new SimpleFunction() { @Override @@ -345,7 +347,7 @@ private List populateTable(Table table, @Nullable String charOverride) thro } DataWriter writer = Parquet.writeData(file) - .schema(ICEBERG_SCHEMA) + .schema(table.schema()) .createWriterFunc(GenericParquetWriter::buildWriter) .overwrite() .withSpec(table.spec()) @@ -629,7 +631,7 @@ public void testWrite() throws IOException { pipeline.run().waitUntilFinish(); Table table = catalog.loadTable(TableIdentifier.parse(tableId())); - assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); + assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(table.schema())); // Read back and check records are correct List returnedRecords = readRecords(table); @@ -641,9 +643,9 @@ public void testWrite() throws IOException { public void testWriteToPartitionedTable() throws IOException { Map config = new HashMap<>(managedIcebergConfig(tableId())); int truncLength = "value_x".length(); - config.put( - "partition_fields", - Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")")); + List partitionFields = + Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")"); + config.put("partition_fields", partitionFields); PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); @@ -651,6 +653,13 @@ public void testWriteToPartitionedTable() throws IOException { // Read back and check records are correct Table table = catalog.loadTable(TableIdentifier.parse(tableId())); List returnedRecords = readRecords(table); + PartitionSpec expectedSpec = + PartitionSpec.builderFor(table.schema()) + .identity("bool_field") + .hour("datetime") + .truncate("str", truncLength) + .build(); + assertEquals(expectedSpec, table.spec()); assertThat( returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } @@ -785,10 +794,8 @@ private void writeToDynamicDestinations( Table table3 = catalog.loadTable(TableIdentifier.parse(tableId() + "_3_d")); Table table4 = catalog.loadTable(TableIdentifier.parse(tableId() + "_4_e")); - org.apache.iceberg.Schema tableSchema = - IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) { - assertTrue(t.schema().sameSchema(tableSchema)); + assertEquals(rowFilter.outputSchema(), icebergSchemaToBeamSchema(t.schema())); } // Read back and check records are correct @@ -800,6 +807,7 @@ private void writeToDynamicDestinations( readRecords(table3), readRecords(table4)); + org.apache.iceberg.Schema tableSchema = beamSchemaToIcebergSchema(rowFilter.outputSchema()); SerializableFunction recordFunc = row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row); @@ -906,7 +914,7 @@ public void testWriteToDynamicNamespaces() throws IOException { table3false, table4true, table4false)) { - assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA)); + assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(t.schema())); } // Read back and check records are correct diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java new file mode 100644 index 000000000000..9b5225ed6b03 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.catalog; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.RESTCatalog; +import org.junit.BeforeClass; + +/** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake Metastore. */ +public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT { + private static final String BIGLAKE_URI = + "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"; + private static Map catalogProps; + + @BeforeClass + public static void setup() throws IOException { + GoogleCredentials credentials = + ServiceAccountCredentials.getApplicationDefault() + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + credentials.refreshIfExpired(); + String accessToken = credentials.getAccessToken().getTokenValue(); + + catalogProps = + ImmutableMap.builder() + .put("type", "rest") + .put("uri", BIGLAKE_URI) + .put("warehouse", "gs://managed-iceberg-integration-tests-us-central1") + .put("oauth2-server-uri", "https://oauth2.googleapis.com/token") + .put("header.x-goog-user-project", OPTIONS.getProject()) + .put("rest-metrics-reporting-enabled", "false") + .put("token", accessToken) + .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") + .build(); + } + + @Override + public String type() { + return "rest"; + } + + @Override + public Catalog createCatalog() { + return CatalogUtil.loadCatalog( + RESTCatalog.class.getName(), catalogName, catalogProps, new Configuration()); + } + + @Override + public Map managedIcebergConfig(String tableId) { + return ImmutableMap.builder() + .put("table", tableId) + .put("catalog_properties", catalogProps) + .build(); + } +} From 80feea40ae67b4e1b6506eaa5ddaf9aa608d7df5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 17 Feb 2026 17:33:44 -0500 Subject: [PATCH 2/3] update rest catalog config --- .../io/iceberg/catalog/RESTCatalogBLMSIT.java | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java index 9b5225ed6b03..a4c9ef73362c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java @@ -17,53 +17,40 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.auth.oauth2.ServiceAccountCredentials; -import java.io.IOException; import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.RESTCatalog; import org.junit.BeforeClass; /** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake Metastore. */ public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT { - private static final String BIGLAKE_URI = - "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"; private static Map catalogProps; @BeforeClass - public static void setup() throws IOException { - GoogleCredentials credentials = - ServiceAccountCredentials.getApplicationDefault() - .createScoped("https://www.googleapis.com/auth/cloud-platform"); - credentials.refreshIfExpired(); - String accessToken = credentials.getAccessToken().getTokenValue(); - + public static void setup() { catalogProps = ImmutableMap.builder() .put("type", "rest") - .put("uri", BIGLAKE_URI) - .put("warehouse", "gs://managed-iceberg-integration-tests-us-central1") - .put("oauth2-server-uri", "https://oauth2.googleapis.com/token") + .put("uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog") + .put("warehouse", warehouse(RESTCatalogBLMSIT.class)) .put("header.x-goog-user-project", OPTIONS.getProject()) .put("rest-metrics-reporting-enabled", "false") - .put("token", accessToken) .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") + .put("rest.auth.type", "org.apache.iceberg.gcp.auth.GoogleAuthManager") .build(); } @Override public String type() { - return "rest"; + return "biglake"; } @Override public Catalog createCatalog() { - return CatalogUtil.loadCatalog( - RESTCatalog.class.getName(), catalogName, catalogProps, new Configuration()); + RESTCatalog restCatalog = new RESTCatalog(); + restCatalog.initialize(catalogName, catalogProps); + return restCatalog; } @Override From 5ab72fe879992ac028e313719da26926ccc60cba Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Feb 2026 13:39:50 -0500 Subject: [PATCH 3/3] use top-level gcs bucket for warehouse --- .../sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java index a4c9ef73362c..c16df763333f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java @@ -21,19 +21,25 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.RESTCatalog; +import org.junit.After; import org.junit.BeforeClass; /** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake Metastore. */ public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT { private static Map catalogProps; + // Using a special bucket for this test class because + // BigLake does not support using subfolders as a warehouse (yet) + private static final String BIGLAKE_WAREHOUSE = "gs://managed-iceberg-biglake-its"; + @BeforeClass public static void setup() { + warehouse = BIGLAKE_WAREHOUSE; catalogProps = ImmutableMap.builder() .put("type", "rest") .put("uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog") - .put("warehouse", warehouse(RESTCatalogBLMSIT.class)) + .put("warehouse", BIGLAKE_WAREHOUSE) .put("header.x-goog-user-project", OPTIONS.getProject()) .put("rest-metrics-reporting-enabled", "false") .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") @@ -41,6 +47,12 @@ public static void setup() { .build(); } + @After + public void after() { + // making sure the cleanup path is directed at the correct warehouse + warehouse = BIGLAKE_WAREHOUSE; + } + @Override public String type() { return "biglake";