diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..34a6e02150e7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 4 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index c188e4f63853..a1f352d05309 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -96,6 +96,7 @@ dependencies { testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.google_api_services_bigquery + testImplementation library.java.google_auth_library_oauth2_http testRuntimeOnly library.java.slf4j_jdk14 testImplementation project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java") diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index c2c5dc0b8f4a..e81f75c40fb1 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; import static org.apache.beam.sdk.managed.Managed.ICEBERG; import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; @@ -313,7 +315,7 @@ public Row apply(Long num) { }; protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA = - IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + beamSchemaToIcebergSchema(BEAM_SCHEMA); protected static final SimpleFunction RECORD_FUNC = new SimpleFunction() { @Override @@ -346,7 +348,7 @@ private List populateTable(Table table, @Nullable String charOverride) thro } DataWriter writer = Parquet.writeData(file) - .schema(ICEBERG_SCHEMA) + .schema(table.schema()) .createWriterFunc(GenericParquetWriter::create) .overwrite() .withSpec(table.spec()) @@ -652,7 +654,7 @@ public void testWrite() throws IOException { pipeline.run().waitUntilFinish(); Table table = catalog.loadTable(TableIdentifier.parse(tableId())); - assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); + assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(table.schema())); // Read back and check records are correct List returnedRecords = readRecords(table); @@ -664,9 +666,9 @@ public void testWrite() throws IOException { public void testWriteToPartitionedTable() throws IOException { Map config = new HashMap<>(managedIcebergConfig(tableId())); int truncLength = "value_x".length(); - config.put( - "partition_fields", - Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")")); + List partitionFields = + Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")"); + config.put("partition_fields", partitionFields); PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); @@ -674,6 +676,13 @@ public void testWriteToPartitionedTable() throws IOException { // Read back and check records are correct Table table = catalog.loadTable(TableIdentifier.parse(tableId())); List returnedRecords = readRecords(table); + PartitionSpec expectedSpec = + PartitionSpec.builderFor(table.schema()) + .identity("bool_field") + .hour("datetime") + .truncate("str", truncLength) + .build(); + assertEquals(expectedSpec, table.spec()); assertThat( returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } @@ -815,10 +824,8 @@ private void writeToDynamicDestinations( Table table3 = catalog.loadTable(TableIdentifier.parse(tableId() + "_3_d")); Table table4 = catalog.loadTable(TableIdentifier.parse(tableId() + "_4_e")); - org.apache.iceberg.Schema tableSchema = - IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) { - assertTrue(t.schema().sameSchema(tableSchema)); + assertEquals(rowFilter.outputSchema(), icebergSchemaToBeamSchema(t.schema())); } // Read back and check records are correct @@ -830,6 +837,7 @@ private void writeToDynamicDestinations( readRecords(table3), readRecords(table4)); + org.apache.iceberg.Schema tableSchema = beamSchemaToIcebergSchema(rowFilter.outputSchema()); SerializableFunction recordFunc = row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row); @@ -936,7 +944,7 @@ public void testWriteToDynamicNamespaces() throws IOException { table3false, table4true, table4false)) { - assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA)); + assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(t.schema())); } // Read back and check records are correct diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java new file mode 100644 index 000000000000..c16df763333f --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.catalog; + +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.RESTCatalog; +import org.junit.After; +import org.junit.BeforeClass; + +/** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake Metastore. */ +public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT { + private static Map catalogProps; + + // Using a special bucket for this test class because + // BigLake does not support using subfolders as a warehouse (yet) + private static final String BIGLAKE_WAREHOUSE = "gs://managed-iceberg-biglake-its"; + + @BeforeClass + public static void setup() { + warehouse = BIGLAKE_WAREHOUSE; + catalogProps = + ImmutableMap.builder() + .put("type", "rest") + .put("uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog") + .put("warehouse", BIGLAKE_WAREHOUSE) + .put("header.x-goog-user-project", OPTIONS.getProject()) + .put("rest-metrics-reporting-enabled", "false") + .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") + .put("rest.auth.type", "org.apache.iceberg.gcp.auth.GoogleAuthManager") + .build(); + } + + @After + public void after() { + // making sure the cleanup path is directed at the correct warehouse + warehouse = BIGLAKE_WAREHOUSE; + } + + @Override + public String type() { + return "biglake"; + } + + @Override + public Catalog createCatalog() { + RESTCatalog restCatalog = new RESTCatalog(); + restCatalog.initialize(catalogName, catalogProps); + return restCatalog; + } + + @Override + public Map managedIcebergConfig(String tableId) { + return ImmutableMap.builder() + .put("table", tableId) + .put("catalog_properties", catalogProps) + .build(); + } +}