From 9b9d5533c4f5471d2bcb978be880bbd167caf8ea Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 23 Feb 2026 11:43:05 -0500 Subject: [PATCH 1/2] add files transform and schematransform --- .../apache/beam/sdk/io/iceberg/AddFiles.java | 279 ++++++++++++++++++ .../AddFilesSchemaTransformProvider.java | 112 +++++++ 2 files changed, 391 insertions(+) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java new file mode 100644 index 000000000000..324e5e80e049 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +import java.util.List; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERROR_SCHEMA; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +public class AddFiles extends SchemaTransform { + private final IcebergCatalogConfig catalogConfig; + private final String tableIdentifier; + private final Duration intervalTrigger; + private final int numFilesTrigger; + private final @Nullable String locationPrefix; + private static final int DEFAULT_FILES_TRIGGER = 100_000; + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + + public AddFiles( + IcebergCatalogConfig catalogConfig, + String tableIdentifier, + @Nullable String locationPrefix, + @Nullable Integer numFilesTrigger, + @Nullable Duration intervalTrigger) { + System.out.println("got catalog config: " + catalogConfig); + this.catalogConfig = catalogConfig; + this.tableIdentifier = tableIdentifier; + this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; + this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; + this.locationPrefix = locationPrefix; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection filePaths = input.getSinglePCollection(); + Schema inputSchema = filePaths.getSchema(); + Preconditions.checkState( + inputSchema.getFieldCount() == 1 + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING) + && !inputSchema.getField(0).getType().getNullable(), + "Incoming Row Schema contain only one (required) field of type String."); + + PCollectionTuple dataFiles = + filePaths + .apply( + "ExtractPaths", + MapElements.into(TypeDescriptors.strings()) + .via(row -> checkStateNotNull(row.getString(0)))) + .apply(Redistribute.arbitrarily()) + .apply( + "ConvertToDataFiles", + ParDo.of(new ConvertToDataFile(catalogConfig, tableIdentifier, locationPrefix)) + .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); + SchemaCoder sdfSchema; + try { + sdfSchema = SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA); + PCollection snapshots = + dataFiles + .get(DATA_FILES) + .setCoder(sdfSchema) + .apply("AddStaticKey", WithKeys.of((Void) null)) + .apply( + GroupIntoBatches.ofSize(numFilesTrigger) + .withMaxBufferingDuration(intervalTrigger)) + .apply("DropKey", Values.create()) + .apply("AddFilesToIceberg", ParDo.of(new AddFilesDoFn(catalogConfig, tableIdentifier))) + .setRowSchema(SnapshotInfo.getSchema()); + + return PCollectionRowTuple.of("snapshots", snapshots); + } + + static class ConvertToDataFile extends DoFn { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + public static final TupleTag ERRORS = new TupleTag<>(); + public static final TupleTag DATA_FILES = new TupleTag<>(); + public static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); + private final @Nullable String prefix; + private static volatile @MonotonicNonNull Table sharedTable; + private boolean isPartitioned = false; + + public ConvertToDataFile( + IcebergCatalogConfig catalogConfig, String identifier, @Nullable String prefix) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.prefix = prefix; + } + + @Setup + public void start() { + if (sharedTable == null) { + synchronized (ConvertToDataFile.class) { + if (sharedTable == null) { + sharedTable = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + } + isPartitioned = sharedTable.spec().isPartitioned(); + Preconditions.checkArgument( + !isPartitioned || prefix != null, + "A location prefix must be specified when adding files to a partitioned table."); + } + + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + private static final String PREFIX_ERROR = "File did not start with the specified prefix"; + + @ProcessElement + public void process(@Element String filePath, MultiOutputReceiver output) { + final Table table = checkStateNotNull(sharedTable); + boolean isPartitioned = table.spec().isPartitioned(); + if (isPartitioned && !filePath.startsWith(checkStateNotNull(prefix))) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + return; + } + + InputFile inputFile = table.io().newInputFile(filePath); + FileFormat format; + try { + format = inferFormat(inputFile.location()); + } catch (UnknownFormat e) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + return; + } + Metrics metrics = getFileMetrics(inputFile, format, MetricsConfig.forTable(table)); + + String partitionPath = getPartitionPath(filePath); + DataFile df = + DataFiles.builder(table.spec()) + .withPath(filePath) + .withFormat(format) + .withMetrics(metrics) + .withFileSizeInBytes(inputFile.getLength()) + .withPartitionPath(partitionPath) + .build(); + + output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); + } + + private String getPartitionPath(String filePath) { + if (!isPartitioned) { + return ""; + } + String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); + List components = Lists.newArrayList(Splitter.on('/').split(partitionPath)); + if (!components.isEmpty()) { + components = components.subList(0, components.size() - 1); + } + return String.join("/", components); + } + } + + static class AddFilesDoFn extends DoFn, Row> { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + private @MonotonicNonNull Table table = null; + + public AddFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + } + + @Setup + public void start() { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + + @ProcessElement + public void process(@Element Iterable files, OutputReceiver output) { + if (!files.iterator().hasNext()) { + return; + } + Table table = checkStateNotNull(this.table); + + AppendFiles appendFiles = table.newAppend(); + for (SerializableDataFile file : files) { + DataFile df = file.createDataFile(table.specs()); + appendFiles.appendFile(df); + } + + appendFiles.commit(); + Snapshot snapshot = table.currentSnapshot(); + output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); + } + } + + @SuppressWarnings("argument") + public static Metrics getFileMetrics(InputFile file, FileFormat format, MetricsConfig config) { + switch (format) { + case PARQUET: + return ParquetUtil.fileMetrics(file, config); + case ORC: + return OrcMetrics.fromInputFile(file, config); + case AVRO: + return new Metrics(Avro.rowCount(file), null, null, null, null); + default: + throw new UnsupportedOperationException("Unsupported format: " + format); + } + } + + public static FileFormat inferFormat(String path) { + String lowerPath = path.toLowerCase(); + + if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) { + return FileFormat.PARQUET; + } else if (lowerPath.endsWith(".orc")) { + return FileFormat.ORC; + } else if (lowerPath.endsWith(".avro")) { + return FileFormat.AVRO; + } + + throw new UnknownFormat(); + } + + static class UnknownFormat extends IllegalArgumentException {} +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java new file mode 100644 index 000000000000..b402902cba1d --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +import java.util.Map; + +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; + +@AutoService(SchemaTransformProvider.class) +public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { + @Override + public AddFiles from(Configuration configuration) { + @Nullable Integer frequency = configuration.getTriggeringFrequencySeconds(); + + return new AddFiles( + configuration.getIcebergCatalog(), + configuration.getTable(), + configuration.getLocationPrefix(), + configuration.getAppendBatchSize(), + frequency != null ? Duration.standardSeconds(frequency) : null); + } + + @Override + public String identifier() { + return "beam:schematransform:iceberg_add_files:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("A fully-qualified table identifier.") + public abstract String getTable(); + + @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") + public abstract @Nullable Map getCatalogProperties(); + + @SchemaFieldDescription("Properties passed to the Hadoop ") + public abstract @Nullable Map getConfigProperties(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). " + + "A commit is triggered when either this or append batch size is reached.") + public abstract @Nullable Integer getTriggeringFrequencySeconds(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. " + + "A commit is triggered when either this or append triggering interval is reached.") + public abstract @Nullable Integer getAppendBatchSize(); + + @SchemaFieldDescription( + "The prefix shared among all partitions. For example, a data file may have the following" + + " location:\n" + + "'file:/Users/user/Documents/tmp/namespace/table_name/data/id=13/name=beam/data_file.parquet'\n\n" + + "The provided prefix should go up until the partition information:\n" + + "'file:/Users/user/Documents/tmp/namespace/table_name/data/'.\n" + + "Required if the table is partitioned. ") + public abstract @Nullable String getLocationPrefix(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String table); + + public abstract Builder setCatalogProperties(Map catalogProperties); + + public abstract Builder setConfigProperties(Map confProperties); + + public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + + public abstract Builder setAppendBatchSize(Integer size); + + public abstract Builder setLocationPrefix(String prefix); + + public abstract Configuration build(); + } + + public IcebergCatalogConfig getIcebergCatalog() { + return IcebergCatalogConfig.builder() + .setCatalogProperties(getCatalogProperties()) + .setConfigProperties(getConfigProperties()) + .build(); + } + } +} From 98328b8e85bed28a66298c5bbdab490433ef7e2c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 24 Feb 2026 13:01:21 -0500 Subject: [PATCH 2/2] minor fixes --- .../org/apache/beam/sdk/io/iceberg/AddFiles.java | 14 ++++++++------ .../iceberg/AddFilesSchemaTransformProvider.java | 7 +++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 324e5e80e049..9ed3a52cfff6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -77,7 +78,6 @@ public AddFiles( @Nullable String locationPrefix, @Nullable Integer numFilesTrigger, @Nullable Duration intervalTrigger) { - System.out.println("got catalog config: " + catalogConfig); this.catalogConfig = catalogConfig; this.tableIdentifier = tableIdentifier; this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; @@ -91,12 +91,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Schema inputSchema = filePaths.getSchema(); Preconditions.checkState( inputSchema.getFieldCount() == 1 - && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING) - && !inputSchema.getField(0).getType().getNullable(), - "Incoming Row Schema contain only one (required) field of type String."); + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), + "Incoming Row Schema must contain only one field of type String. Instead, got schema: %s", + inputSchema); PCollectionTuple dataFiles = filePaths + .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) .apply( "ExtractPaths", MapElements.into(TypeDescriptors.strings()) @@ -112,7 +113,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } catch (Exception e) { throw new RuntimeException(e); } - dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA); + PCollection snapshots = dataFiles .get(DATA_FILES) @@ -125,7 +126,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .apply("AddFilesToIceberg", ParDo.of(new AddFilesDoFn(catalogConfig, tableIdentifier))) .setRowSchema(SnapshotInfo.getSchema()); - return PCollectionRowTuple.of("snapshots", snapshots); + return PCollectionRowTuple.of( + "snapshots", snapshots, "errors", dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA)); } static class ConvertToDataFile extends DoFn { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java index b402902cba1d..8322667ac656 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; @@ -27,10 +30,6 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; -import java.util.Map; - -import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; - @AutoService(SchemaTransformProvider.class) public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { @Override