From eb0fb29d60bd6b1233c6e8bfadf3a537dcc4c259 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Wed, 30 Jul 2025 07:19:03 +0000 Subject: [PATCH 01/10] Adding time partitioning support for python storage write api --- .../providers/BigQueryWriteConfiguration.java | 39 +++++++++ .../PortableBigQueryDestinations.java | 37 ++++++++- .../bigquery/providers/BigQueryManagedIT.java | 16 +++- .../io/external/xlang_bigqueryio_it_test.py | 82 +++++++++++++++++-- sdks/python/apache_beam/io/gcp/bigquery.py | 5 ++ .../content/en/documentation/io/managed-io.md | 12 +++ 6 files changed, 181 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 5df6e1f6afcd..b4e467902099 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -58,6 +59,39 @@ public abstract static class Builder { } } + @AutoValue + public abstract static class TimePartitioningConfig implements Serializable { + @SchemaFieldDescription("The time partitioning type.") + public abstract @Nullable String getType(); + + @SchemaFieldDescription("If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.") + public abstract @Nullable String getField(); + + @SchemaFieldDescription("The number of milliseconds for which to keep the storage for a partition.") + public abstract @Nullable Long getExpirationMs(); + + @SchemaFieldDescription("If set to true, queries over this table require a partition filter.") + public abstract @Nullable Boolean getRequirePartitionFilter(); + + public static Builder builder() { + return new AutoValue_BigQueryWriteConfiguration_TimePartitioningConfig.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder implements Serializable { + + public abstract Builder setType(String type); + + public abstract Builder setField(String field); + + public abstract Builder setExpirationMs(Long expirationMs); + + public abstract Builder setRequirePartitionFilter(Boolean requirePartitionFilter); + + public abstract TimePartitioningConfig build(); + } + } + public void validate() { String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: "; @@ -197,6 +231,9 @@ public static Builder builder() { @SchemaFieldDescription("A list of columns to cluster the BigQuery table by.") public abstract @Nullable List getClusteringFields(); + @SchemaFieldDescription("Configuration for BigQuery time partitioning.") + public abstract @Nullable TimePartitioningConfig getTimePartitioningConfig(); + /** Builder for {@link BigQueryWriteConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -231,6 +268,8 @@ public abstract static class Builder { public abstract Builder setClusteringFields(List clusteringFields); + public abstract Builder setTimePartitioningConfig(TimePartitioningConfig config); + /** Builds a {@link BigQueryWriteConfiguration} instance. */ public abstract BigQueryWriteConfiguration build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 42eee4f3f03c..42887b451eb9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -25,6 +25,7 @@ import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.annotations.Internal; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.TimePartitioningConfig; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.RowFilter; @@ -50,9 +52,11 @@ public class PortableBigQueryDestinations extends DynamicDestinations primaryKey; private final RowFilter rowFilter; private final @Nullable List clusteringFields; + private final @Nullable TimePartitioningConfig timePartitioningConfig; public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { this.clusteringFields = configuration.getClusteringFields(); + this.timePartitioningConfig = configuration.getTimePartitioningConfig(); // DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language. // In that case, we do no interpolation if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { @@ -83,9 +87,40 @@ public String getDestination(@Nullable ValueInSingleWindow element) { @Override public TableDestination getTable(String destination) { + TimePartitioning timePartitioning = null; + + if (timePartitioningConfig != null) { + String type = timePartitioningConfig.getType(); + String field = timePartitioningConfig.getField(); + Long expirationMs = timePartitioningConfig.getExpirationMs(); + Boolean requirePartitionFilter = timePartitioningConfig.getRequirePartitionFilter(); + + if (type == null) { + throw new IllegalArgumentException( + "TimePartitioning 'type' must be specified (DAY, HOUR, MONTH, or YEAR)."); + } + + timePartitioning = + new TimePartitioning().setType(type); // type is required, as checked earlier + + if (field != null) { + timePartitioning.setField(field); + } + + if (expirationMs != null) { + timePartitioning.setExpirationMs(expirationMs); + } + + if (requirePartitionFilter != null) { + timePartitioning.setRequirePartitionFilter(requirePartitionFilter); + } + } + if (clusteringFields != null && !clusteringFields.isEmpty()) { Clustering clustering = new Clustering().setFields(clusteringFields); - return new TableDestination(destination, null, null, clustering); + return new TableDestination(destination, null, timePartitioning, clustering); + } else if (timePartitioning != null) { + return new TableDestination(destination, null, timePartitioning); } return new TableDestination(destination, null); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 4c164e6a38db..ecaf23baaafe 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -51,7 +52,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; @@ -73,9 +73,13 @@ public class BigQueryManagedIT { Schema.Field.of("number", Schema.FieldType.INT64), Schema.Field.of("dest", Schema.FieldType.INT64)); + // Schema.Field.of("field_date", Schema.FieldType.DATETIME) + private static final SerializableFunction ROW_FUNC = l -> Row.withSchema(SCHEMA).addValue(Long.toString(l)).addValue(l).addValue(l % 3).build(); + // .addValue(org.joda.time.Instant.parse("2025-07-29T03:15:03.241Z")) + private static final List ROWS = LongStream.range(0, 20).mapToObj(ROW_FUNC::apply).collect(Collectors.toList()); @@ -83,10 +87,13 @@ public class BigQueryManagedIT { private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime(); private static final Clustering CLUSTERING = new Clustering().setFields(Arrays.asList("str")); + private static final TimePartitioning TIME_PARTITIONING = new TimePartitioning().setType("DAY"); + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. @@ -102,8 +109,12 @@ public static void cleanup() { public void testBatchFileLoadsWriteRead() throws IOException, InterruptedException { String table = String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); + Map writeConfig = - ImmutableMap.of("table", table, "clustering_fields", Collections.singletonList("str")); + ImmutableMap.of( + "table", table, + "clustering_fields", Collections.singletonList("str"), + "time_partitioning_config", ImmutableMap.of("type", "DAY")); // file loads requires a GCS temp location String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); @@ -128,6 +139,7 @@ public void testBatchFileLoadsWriteRead() throws IOException, InterruptedExcepti Table tableMetadata = BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Assert.assertEquals(CLUSTERING, tableMetadata.getClustering()); + Assert.assertEquals(TIME_PARTITIONING, tableMetadata.getTimePartitioning()); } @Test diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 38d9174cef2b..e30d6cb26a6f 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -69,7 +69,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "a", "bool": True, "bytes": b'a', - "timestamp": Timestamp(1000, 100) + "timestamp": Timestamp(1000, 100), + "event_time": Timestamp(1722243600) }, { "int": 2, @@ -78,7 +79,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "b", "bool": False, "bytes": b'b', - "timestamp": Timestamp(2000, 200) + "timestamp": Timestamp(2000, 200), + "event_time": Timestamp(1722277200) }, { "int": 3, @@ -87,7 +89,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "c", "bool": True, "bytes": b'd', - "timestamp": Timestamp(3000, 300) + "timestamp": Timestamp(3000, 300), + "event_time": Timestamp(1722304200) }, { "int": 4, @@ -96,17 +99,19 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "d", "bool": False, "bytes": b'd', - "timestamp": Timestamp(4000, 400) + "timestamp": Timestamp(4000, 400), + "event_time": Timestamp(1722383999) } ] ALL_TYPES_SCHEMA = ( "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," - "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") + "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP,event_time:TIMESTAMP") def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() - self.project = self.test_pipeline.get_option('project') + # self.project = self.test_pipeline.get_option('project') + self.project = "tanusharmaa" self._runner = PipelineOptions(self.args).get_all_options()['runner'] self.bigquery_client = BigQueryWrapper() @@ -266,7 +271,8 @@ def test_write_with_clustering(self): write_disposition='WRITE_TRUNCATE', additional_bq_parameters={'clustering': { 'fields': ['int'] - }})) + } + })) # After pipeline finishes, verify clustering is applied table = self.bigquery_client.get_table(self.project, self.dataset_id, table) @@ -275,6 +281,68 @@ def test_write_with_clustering(self): self.assertEqual(clustering_fields, ['int']) hamcrest_assert(p, bq_matcher) + def test_write_with_time_partitioning(self): + table = 'write_with_time_partitioning' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + EXPECTED_DATA = [ + # (int, float, numeric, string, bool, bytes, timestamp, timestamp) + { + "int": 1, + "float": 0.1, + "numeric": Decimal("1.11"), + "str": "a", + "bool": True, + "bytes": b'a', + "timestamp": Timestamp(1000, 100), + "event_time": Timestamp(1722243600) + }, + { + "int": 2, + "float": 0.2, + "numeric": Decimal("2.22"), + "str": "b", + "bool": False, + "bytes": b'b', + "timestamp": Timestamp(2000, 200), + "event_time": Timestamp(1722277200) + } + ] + + bq_matcher = BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM {}.{} WHERE DATE(event_time) = '2024-07-29'" + .format(self.dataset_id, table), + data=self.parse_expected_data(EXPECTED_DATA)) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + additional_bq_parameters={ + 'timePartitioning': { + 'type': 'DAY', + 'field': 'event_time', + 'expiration_ms': 2592000000, + 'require_partition_filter': True + } + })) + + # After pipeline finishes, verify time partitioning is applied + table = self.bigquery_client.get_table(self.project, self.dataset_id, table) + + self.assertEqual(table.timePartitioning.type, 'DAY') + self.assertEqual(table.timePartitioning.field, 'event_time') + self.assertEqual(table.timePartitioning.requirePartitionFilter, True) + self.assertEqual(table.timePartitioning.expirationMs, 2592000000) + hamcrest_assert(p, bq_matcher) + def test_write_with_beam_rows_cdc(self): table = 'write_with_beam_rows_cdc' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index baabedb20e44..b7b736e2a6e8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2702,6 +2702,8 @@ def expand(self, input): table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS clustering_fields = [] + time_partitioning_config= {} + if self.additional_bq_parameters: if callable(self.additional_bq_parameters): raise NotImplementedError( @@ -2709,6 +2711,8 @@ def expand(self, input): "supported for STORAGE_WRITE_API write method.") clustering_fields = ( self.additional_bq_parameters.get("clustering", {}).get("fields", [])) + time_partitioning_config = ( + self.additional_bq_parameters.get("timePartitioning", None)) output = ( input_beam_rows @@ -2726,6 +2730,7 @@ def expand(self, input): use_cdc_writes=self._use_cdc_writes, primary_key=self._primary_key, clustering_fields=clustering_fields, + time_partitioning_config=time_partitioning_config, error_handling={ 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS })) diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 53631d279381..a464b335b2d7 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -811,6 +811,18 @@ For more information on table properties, please visit https://iceberg.apache.or Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds. + + + Time Partitiong + + + map[str, str] + + + Configuration for time-based partitioning on a DATE, TIMESTAMP, or DATETIME column when writing to the output table. + + + From 79d0de6997d40ef02795974fdcd20dda5778b4d0 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Wed, 30 Jul 2025 07:29:57 +0000 Subject: [PATCH 02/10] corrections --- .../apache_beam/io/external/xlang_bigqueryio_it_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index e30d6cb26a6f..c5b15cd5a2b8 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -61,7 +61,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' ELEMENTS = [ - # (int, float, numeric, string, bool, bytes, timestamp) + # (int, float, numeric, string, bool, bytes, timestamp, timestamp) { "int": 1, "float": 0.1, @@ -110,8 +110,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() - # self.project = self.test_pipeline.get_option('project') - self.project = "tanusharmaa" + self.project = self.test_pipeline.get_option('project') self._runner = PipelineOptions(self.args).get_all_options()['runner'] self.bigquery_client = BigQueryWrapper() From e437cff3083e73bd5c65594cf6317fa78f3f11df Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Wed, 30 Jul 2025 07:33:41 +0000 Subject: [PATCH 03/10] Fix Typos --- .../beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java | 2 -- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- website/www/site/content/en/documentation/io/managed-io.md | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index ecaf23baaafe..12514d285ff7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -73,8 +73,6 @@ public class BigQueryManagedIT { Schema.Field.of("number", Schema.FieldType.INT64), Schema.Field.of("dest", Schema.FieldType.INT64)); - // Schema.Field.of("field_date", Schema.FieldType.DATETIME) - private static final SerializableFunction ROW_FUNC = l -> Row.withSchema(SCHEMA).addValue(Long.toString(l)).addValue(l).addValue(l % 3).build(); diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index b7b736e2a6e8..a442ff323c80 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2702,7 +2702,7 @@ def expand(self, input): table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS clustering_fields = [] - time_partitioning_config= {} + time_partitioning_config= None if self.additional_bq_parameters: if callable(self.additional_bq_parameters): diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index a464b335b2d7..5b8809606955 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -813,7 +813,7 @@ For more information on table properties, please visit https://iceberg.apache.or - Time Partitiong + Time Partitioning map[str, str] From 86f8c383b88eac881ae5559986a5ee67ffc9a17e Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Thu, 31 Jul 2025 08:52:58 +0000 Subject: [PATCH 04/10] Spotless apply and python formatting --- .../providers/BigQueryWriteConfiguration.java | 6 +- .../bigquery/providers/BigQueryManagedIT.java | 1 + .../io/external/xlang_bigqueryio_it_test.py | 61 +++++++++---------- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- sdks/standard_external_transforms.yaml | 2 +- .../content/en/documentation/io/managed-io.md | 12 ---- 6 files changed, 37 insertions(+), 47 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index b4e467902099..83df11f0e959 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -64,10 +64,12 @@ public abstract static class TimePartitioningConfig implements Serializable { @SchemaFieldDescription("The time partitioning type.") public abstract @Nullable String getType(); - @SchemaFieldDescription("If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.") + @SchemaFieldDescription( + "If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.") public abstract @Nullable String getField(); - @SchemaFieldDescription("The number of milliseconds for which to keep the storage for a partition.") + @SchemaFieldDescription( + "The number of milliseconds for which to keep the storage for a partition.") public abstract @Nullable Long getExpirationMs(); @SchemaFieldDescription("If set to true, queries over this table require a partition filter.") diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 12514d285ff7..fc25fa781fd4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -52,6 +52,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index c5b15cd5a2b8..70635818c681 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -270,8 +270,7 @@ def test_write_with_clustering(self): write_disposition='WRITE_TRUNCATE', additional_bq_parameters={'clustering': { 'fields': ['int'] - } - })) + }})) # After pipeline finishes, verify clustering is applied table = self.bigquery_client.get_table(self.project, self.dataset_id, table) @@ -286,32 +285,32 @@ def test_write_with_time_partitioning(self): EXPECTED_DATA = [ # (int, float, numeric, string, bool, bytes, timestamp, timestamp) - { - "int": 1, - "float": 0.1, - "numeric": Decimal("1.11"), - "str": "a", - "bool": True, - "bytes": b'a', - "timestamp": Timestamp(1000, 100), - "event_time": Timestamp(1722243600) - }, - { - "int": 2, - "float": 0.2, - "numeric": Decimal("2.22"), - "str": "b", - "bool": False, - "bytes": b'b', - "timestamp": Timestamp(2000, 200), - "event_time": Timestamp(1722277200) - } - ] + { + "int": 1, + "float": 0.1, + "numeric": Decimal("1.11"), + "str": "a", + "bool": True, + "bytes": b'a', + "timestamp": Timestamp(1000, 100), + "event_time": Timestamp(1722243600) + }, + { + "int": 2, + "float": 0.2, + "numeric": Decimal("2.22"), + "str": "b", + "bool": False, + "bytes": b'b', + "timestamp": Timestamp(2000, 200), + "event_time": Timestamp(1722277200) + } + ] bq_matcher = BigqueryFullResultMatcher( project=self.project, - query="SELECT * FROM {}.{} WHERE DATE(event_time) = '2024-07-29'" - .format(self.dataset_id, table), + query="SELECT * FROM {}.{} WHERE DATE(event_time) = '2024-07-29'". + format(self.dataset_id, table), data=self.parse_expected_data(EXPECTED_DATA)) with beam.Pipeline(argv=self.args) as p: @@ -325,12 +324,12 @@ def test_write_with_time_partitioning(self): create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_TRUNCATE', additional_bq_parameters={ - 'timePartitioning': { - 'type': 'DAY', - 'field': 'event_time', - 'expiration_ms': 2592000000, - 'require_partition_filter': True - } + 'timePartitioning': { + 'type': 'DAY', + 'field': 'event_time', + 'expiration_ms': 2592000000, + 'require_partition_filter': True + } })) # After pipeline finishes, verify time partitioning is applied diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index a442ff323c80..2039cb537720 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2702,7 +2702,7 @@ def expand(self, input): table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS clustering_fields = [] - time_partitioning_config= None + time_partitioning_config = None if self.additional_bq_parameters: if callable(self.additional_bq_parameters): diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 1c536ce319d2..3f3caf0bb279 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,7 +19,7 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2025-06-05 +# Last updated on: 2025-07-31 - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 5b8809606955..53631d279381 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -811,18 +811,6 @@ For more information on table properties, please visit https://iceberg.apache.or Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds. - - - Time Partitioning - - - map[str, str] - - - Configuration for time-based partitioning on a DATE, TIMESTAMP, or DATETIME column when writing to the output table. - - - From c4917eeeca16f9241c225088b7884991564809fa Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Mon, 4 Aug 2025 05:06:59 +0000 Subject: [PATCH 05/10] formatting --- sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 70635818c681..110609e45570 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -284,7 +284,7 @@ def test_write_with_time_partitioning(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) EXPECTED_DATA = [ - # (int, float, numeric, string, bool, bytes, timestamp, timestamp) + # (int, float, numeric, string, bool, bytes, timestamp, timestamp) { "int": 1, "float": 0.1, From 0d581049b5d7b714f51c913634d39a07b5187eee Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Tue, 5 Aug 2025 05:27:57 +0000 Subject: [PATCH 06/10] Clean up --- .../beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index fc25fa781fd4..3eff00da0268 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -77,8 +77,6 @@ public class BigQueryManagedIT { private static final SerializableFunction ROW_FUNC = l -> Row.withSchema(SCHEMA).addValue(Long.toString(l)).addValue(l).addValue(l % 3).build(); - // .addValue(org.joda.time.Instant.parse("2025-07-29T03:15:03.241Z")) - private static final List ROWS = LongStream.range(0, 20).mapToObj(ROW_FUNC::apply).collect(Collectors.toList()); @@ -134,7 +132,7 @@ public void testBatchFileLoadsWriteRead() throws IOException, InterruptedExcepti PAssert.that(outputRows).containsInAnyOrder(ROWS); readPipeline.run().waitUntilFinish(); - // Asserting clustering + // Asserting clustering and time partitioning Table tableMetadata = BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Assert.assertEquals(CLUSTERING, tableMetadata.getClustering()); From 5c49c0bd2a251bfad70dae91fef25de39b535927 Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Mon, 8 Sep 2025 10:41:09 +0000 Subject: [PATCH 07/10] Resolving comments --- .../providers/BigQueryWriteConfiguration.java | 2 +- .../PortableBigQueryDestinations.java | 24 +++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 83df11f0e959..0195e9981f99 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -62,7 +62,7 @@ public abstract static class Builder { @AutoValue public abstract static class TimePartitioningConfig implements Serializable { @SchemaFieldDescription("The time partitioning type.") - public abstract @Nullable String getType(); + public abstract String getType(); @SchemaFieldDescription( "If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 42887b451eb9..75e31dd0128f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -26,7 +26,10 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; @@ -57,6 +60,20 @@ public class PortableBigQueryDestinations extends DynamicDestinations field.getName().equals(partitionField)); + if (!fieldExists) { + throw new IllegalArgumentException( + String.format( + "The partition field '%s' does not exist in the input schema.", partitionField)); + } + } // DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language. // In that case, we do no interpolation if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { @@ -95,9 +112,12 @@ public TableDestination getTable(String destination) { Long expirationMs = timePartitioningConfig.getExpirationMs(); Boolean requirePartitionFilter = timePartitioningConfig.getRequirePartitionFilter(); - if (type == null) { + Set allowedTypes = new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); + if (!allowedTypes.contains(type)) { throw new IllegalArgumentException( - "TimePartitioning 'type' must be specified (DAY, HOUR, MONTH, or YEAR)."); + String.format( + "Invalid TimePartitioning 'type': '%s'. Allowed values are: %s", + type, allowedTypes)); } timePartitioning = From 080a6883e2ccef77fcc0406ba4b700046ce9111c Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Fri, 10 Oct 2025 01:40:30 +0000 Subject: [PATCH 08/10] nit --- .../io/gcp/bigquery/providers/PortableBigQueryDestinations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 75e31dd0128f..c8944dd15826 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -56,6 +56,7 @@ public class PortableBigQueryDestinations extends DynamicDestinations clusteringFields; private final @Nullable TimePartitioningConfig timePartitioningConfig; + private static final Set allowedTypes = new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { this.clusteringFields = configuration.getClusteringFields(); @@ -112,7 +113,6 @@ public TableDestination getTable(String destination) { Long expirationMs = timePartitioningConfig.getExpirationMs(); Boolean requirePartitionFilter = timePartitioningConfig.getRequirePartitionFilter(); - Set allowedTypes = new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); if (!allowedTypes.contains(type)) { throw new IllegalArgumentException( String.format( From 2fcd2ad3c7e55432278045a8686e1694bd73e07c Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Tue, 14 Oct 2025 01:21:42 +0000 Subject: [PATCH 09/10] formatting --- .../providers/BigQueryWriteConfiguration.java | 4 +- .../PortableBigQueryDestinations.java | 3 +- sdks/standard_external_transforms.yaml | 106 +++++++++--------- 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 28ebd6f0b32e..6c12b846c77f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -235,7 +235,7 @@ public static Builder builder() { @SchemaFieldDescription("Configuration for BigQuery time partitioning.") public abstract @Nullable TimePartitioningConfig getTimePartitioningConfig(); - + @SchemaFieldDescription( "Configuration for creating BigLake tables. The following options are available:" + "\n - connectionId (REQUIRED): the name of your cloud resource connection," @@ -279,7 +279,7 @@ public abstract static class Builder { public abstract Builder setClusteringFields(List clusteringFields); public abstract Builder setTimePartitioningConfig(TimePartitioningConfig config); - + public abstract Builder setBigLakeConfiguration( java.util.Map bigLakeConfiguration); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index c8944dd15826..ffce9ccce99f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -56,7 +56,8 @@ public class PortableBigQueryDestinations extends DynamicDestinations clusteringFields; private final @Nullable TimePartitioningConfig timePartitioningConfig; - private static final Set allowedTypes = new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); + private static final Set allowedTypes = + new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { this.clusteringFields = configuration.getClusteringFields(); diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 3f3caf0bb279..cd86a8c5c741 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,60 +19,8 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2025-07-31 +# Last updated on: 2025-10-14 -- default_service: sdks:java:io:expansion-service:shadowJar - description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 - number called "value". The count is produced from the given "start" value and - either up to the given "end" or until 2^63 - 1. - - To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded - sequences can specify a "rate" for output elements. - - In all cases, the sequence of numbers is generated in parallel, so there is no - inherent ordering between the generated values' - destinations: - python: apache_beam/io - fields: - - description: The maximum number to generate (exclusive). Will be an unbounded - sequence if left unspecified. - name: end - nullable: true - type: int64 - - description: Specifies the rate to generate a given number of elements per a given - number of seconds. Applicable only to unbounded sequences. - name: rate - nullable: true - type: Row(elements=, seconds=typing.Optional[int64]) - - description: The minimum number to generate (inclusive). - name: start - nullable: false - type: int64 - identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 - name: GenerateSequence -- default_service: sdks:java:io:expansion-service:shadowJar - description: '' - destinations: - python: apache_beam/io - fields: - - description: Decompression type to use when reading input files. - name: compression - nullable: false - type: str - - description: This option specifies whether and where to output unwritable rows. - name: error_handling - nullable: true - type: Row(output=) - - description: Filename or file pattern used to find input files. - name: file_pattern - nullable: false - type: str - - description: Validate file pattern. - name: validate - nullable: false - type: boolean - identifier: beam:schematransform:org.apache.beam:tfrecord_read:v1 - name: TfrecordRead - default_service: sdks:java:io:expansion-service:shadowJar description: '' destinations: @@ -115,3 +63,55 @@ type: str identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1 name: TfrecordWrite +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: Decompression type to use when reading input files. + name: compression + nullable: false + type: str + - description: This option specifies whether and where to output unwritable rows. + name: error_handling + nullable: true + type: Row(output=) + - description: Filename or file pattern used to find input files. + name: file_pattern + nullable: false + type: str + - description: Validate file pattern. + name: validate + nullable: false + type: boolean + identifier: beam:schematransform:org.apache.beam:tfrecord_read:v1 + name: TfrecordRead +- default_service: sdks:java:io:expansion-service:shadowJar + description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 + number called "value". The count is produced from the given "start" value and + either up to the given "end" or until 2^63 - 1. + + To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded + sequences can specify a "rate" for output elements. + + In all cases, the sequence of numbers is generated in parallel, so there is no + inherent ordering between the generated values' + destinations: + python: apache_beam/io + fields: + - description: The maximum number to generate (exclusive). Will be an unbounded + sequence if left unspecified. + name: end + nullable: true + type: int64 + - description: Specifies the rate to generate a given number of elements per a given + number of seconds. Applicable only to unbounded sequences. + name: rate + nullable: true + type: Row(elements=, seconds=typing.Optional[int64]) + - description: The minimum number to generate (inclusive). + name: start + nullable: false + type: int64 + identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 + name: GenerateSequence From 6a14de3672e2e5adc96ca69804f7f2f0f101645b Mon Sep 17 00:00:00 2001 From: Tanu Sharma Date: Tue, 14 Oct 2025 05:28:43 +0000 Subject: [PATCH 10/10] updated xlang yaml --- sdks/standard_external_transforms.yaml | 46 +++++++++++++------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index cd86a8c5c741..ce9d5993a09f 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -63,29 +63,6 @@ type: str identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1 name: TfrecordWrite -- default_service: sdks:java:io:expansion-service:shadowJar - description: '' - destinations: - python: apache_beam/io - fields: - - description: Decompression type to use when reading input files. - name: compression - nullable: false - type: str - - description: This option specifies whether and where to output unwritable rows. - name: error_handling - nullable: true - type: Row(output=) - - description: Filename or file pattern used to find input files. - name: file_pattern - nullable: false - type: str - - description: Validate file pattern. - name: validate - nullable: false - type: boolean - identifier: beam:schematransform:org.apache.beam:tfrecord_read:v1 - name: TfrecordRead - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and @@ -115,3 +92,26 @@ type: int64 identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 name: GenerateSequence +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: Decompression type to use when reading input files. + name: compression + nullable: false + type: str + - description: This option specifies whether and where to output unwritable rows. + name: error_handling + nullable: true + type: Row(output=) + - description: Filename or file pattern used to find input files. + name: file_pattern + nullable: false + type: str + - description: Validate file pattern. + name: validate + nullable: false + type: boolean + identifier: beam:schematransform:org.apache.beam:tfrecord_read:v1 + name: TfrecordRead