diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 55d7f7c8d72a..6c12b846c77f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -58,6 +59,41 @@ public abstract static class Builder { } } + @AutoValue + public abstract static class TimePartitioningConfig implements Serializable { + @SchemaFieldDescription("The time partitioning type.") + public abstract String getType(); + + @SchemaFieldDescription( + "If not set, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the table is partitioned by this field.") + public abstract @Nullable String getField(); + + @SchemaFieldDescription( + "The number of milliseconds for which to keep the storage for a partition.") + public abstract @Nullable Long getExpirationMs(); + + @SchemaFieldDescription("If set to true, queries over this table require a partition filter.") + public abstract @Nullable Boolean getRequirePartitionFilter(); + + public static Builder builder() { + return new AutoValue_BigQueryWriteConfiguration_TimePartitioningConfig.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder implements Serializable { + + public abstract Builder setType(String type); + + public abstract Builder setField(String field); + + public abstract Builder setExpirationMs(Long expirationMs); + + public abstract Builder setRequirePartitionFilter(Boolean requirePartitionFilter); + + public abstract TimePartitioningConfig build(); + } + } + public void validate() { String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: "; @@ -197,6 +233,9 @@ public static Builder builder() { @SchemaFieldDescription("A list of columns to cluster the BigQuery table by.") public abstract @Nullable List getClusteringFields(); + @SchemaFieldDescription("Configuration for BigQuery time partitioning.") + public abstract @Nullable TimePartitioningConfig getTimePartitioningConfig(); + @SchemaFieldDescription( "Configuration for creating BigLake tables. The following options are available:" + "\n - connectionId (REQUIRED): the name of your cloud resource connection," @@ -239,6 +278,8 @@ public abstract static class Builder { public abstract Builder setClusteringFields(List clusteringFields); + public abstract Builder setTimePartitioningConfig(TimePartitioningConfig config); + public abstract Builder setBigLakeConfiguration( java.util.Map bigLakeConfiguration); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 42eee4f3f03c..ffce9ccce99f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -25,7 +25,11 @@ import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; @@ -33,6 +37,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.TimePartitioningConfig; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.RowFilter; @@ -50,9 +55,27 @@ public class PortableBigQueryDestinations extends DynamicDestinations primaryKey; private final RowFilter rowFilter; private final @Nullable List clusteringFields; + private final @Nullable TimePartitioningConfig timePartitioningConfig; + private static final Set allowedTypes = + new HashSet<>(Arrays.asList("DAY", "HOUR", "MONTH", "YEAR")); public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { this.clusteringFields = configuration.getClusteringFields(); + this.timePartitioningConfig = configuration.getTimePartitioningConfig(); + + // Validate partition field exists if time partitioning field is set + if (this.timePartitioningConfig != null && this.timePartitioningConfig.getField() != null) { + String partitionField = this.timePartitioningConfig.getField(); + + // Check if the partition field exists in the schema + boolean fieldExists = + rowSchema.getFields().stream().anyMatch(field -> field.getName().equals(partitionField)); + if (!fieldExists) { + throw new IllegalArgumentException( + String.format( + "The partition field '%s' does not exist in the input schema.", partitionField)); + } + } // DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language. // In that case, we do no interpolation if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { @@ -83,9 +106,42 @@ public String getDestination(@Nullable ValueInSingleWindow element) { @Override public TableDestination getTable(String destination) { + TimePartitioning timePartitioning = null; + + if (timePartitioningConfig != null) { + String type = timePartitioningConfig.getType(); + String field = timePartitioningConfig.getField(); + Long expirationMs = timePartitioningConfig.getExpirationMs(); + Boolean requirePartitionFilter = timePartitioningConfig.getRequirePartitionFilter(); + + if (!allowedTypes.contains(type)) { + throw new IllegalArgumentException( + String.format( + "Invalid TimePartitioning 'type': '%s'. Allowed values are: %s", + type, allowedTypes)); + } + + timePartitioning = + new TimePartitioning().setType(type); // type is required, as checked earlier + + if (field != null) { + timePartitioning.setField(field); + } + + if (expirationMs != null) { + timePartitioning.setExpirationMs(expirationMs); + } + + if (requirePartitionFilter != null) { + timePartitioning.setRequirePartitionFilter(requirePartitionFilter); + } + } + if (clusteringFields != null && !clusteringFields.isEmpty()) { Clustering clustering = new Clustering().setFields(clusteringFields); - return new TableDestination(destination, null, null, clustering); + return new TableDestination(destination, null, timePartitioning, clustering); + } else if (timePartitioning != null) { + return new TableDestination(destination, null, timePartitioning); } return new TableDestination(destination, null); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 4c164e6a38db..3eff00da0268 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -83,10 +84,13 @@ public class BigQueryManagedIT { private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime(); private static final Clustering CLUSTERING = new Clustering().setFields(Arrays.asList("str")); + private static final TimePartitioning TIME_PARTITIONING = new TimePartitioning().setType("DAY"); + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. @@ -102,8 +106,12 @@ public static void cleanup() { public void testBatchFileLoadsWriteRead() throws IOException, InterruptedException { String table = String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); + Map writeConfig = - ImmutableMap.of("table", table, "clustering_fields", Collections.singletonList("str")); + ImmutableMap.of( + "table", table, + "clustering_fields", Collections.singletonList("str"), + "time_partitioning_config", ImmutableMap.of("type", "DAY")); // file loads requires a GCS temp location String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); @@ -124,10 +132,11 @@ public void testBatchFileLoadsWriteRead() throws IOException, InterruptedExcepti PAssert.that(outputRows).containsInAnyOrder(ROWS); readPipeline.run().waitUntilFinish(); - // Asserting clustering + // Asserting clustering and time partitioning Table tableMetadata = BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Assert.assertEquals(CLUSTERING, tableMetadata.getClustering()); + Assert.assertEquals(TIME_PARTITIONING, tableMetadata.getTimePartitioning()); } @Test diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 51ae97b99175..900842285ab8 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -66,7 +66,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' ELEMENTS = [ - # (int, float, numeric, string, bool, bytes, timestamp) + # (int, float, numeric, string, bool, bytes, timestamp, timestamp) { "int": 1, "float": 0.1, @@ -74,7 +74,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "a", "bool": True, "bytes": b'a', - "timestamp": Timestamp(1000, 100) + "timestamp": Timestamp(1000, 100), + "event_time": Timestamp(1722243600) }, { "int": 2, @@ -83,7 +84,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "b", "bool": False, "bytes": b'b', - "timestamp": Timestamp(2000, 200) + "timestamp": Timestamp(2000, 200), + "event_time": Timestamp(1722277200) }, { "int": 3, @@ -92,7 +94,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "c", "bool": True, "bytes": b'd', - "timestamp": Timestamp(3000, 300) + "timestamp": Timestamp(3000, 300), + "event_time": Timestamp(1722304200) }, { "int": 4, @@ -101,12 +104,13 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): "str": "d", "bool": False, "bytes": b'd', - "timestamp": Timestamp(4000, 400) + "timestamp": Timestamp(4000, 400), + "event_time": Timestamp(1722383999) } ] ALL_TYPES_SCHEMA = ( "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," - "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") + "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP,event_time:TIMESTAMP") def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) @@ -336,6 +340,68 @@ def test_write_with_clustering(self): self.assertEqual(clustering_fields, ['int']) hamcrest_assert(p, bq_matcher) + def test_write_with_time_partitioning(self): + table = 'write_with_time_partitioning' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + EXPECTED_DATA = [ + # (int, float, numeric, string, bool, bytes, timestamp, timestamp) + { + "int": 1, + "float": 0.1, + "numeric": Decimal("1.11"), + "str": "a", + "bool": True, + "bytes": b'a', + "timestamp": Timestamp(1000, 100), + "event_time": Timestamp(1722243600) + }, + { + "int": 2, + "float": 0.2, + "numeric": Decimal("2.22"), + "str": "b", + "bool": False, + "bytes": b'b', + "timestamp": Timestamp(2000, 200), + "event_time": Timestamp(1722277200) + } + ] + + bq_matcher = BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM {}.{} WHERE DATE(event_time) = '2024-07-29'". + format(self.dataset_id, table), + data=self.parse_expected_data(EXPECTED_DATA)) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + additional_bq_parameters={ + 'timePartitioning': { + 'type': 'DAY', + 'field': 'event_time', + 'expiration_ms': 2592000000, + 'require_partition_filter': True + } + })) + + # After pipeline finishes, verify time partitioning is applied + table = self.bigquery_client.get_table(self.project, self.dataset_id, table) + + self.assertEqual(table.timePartitioning.type, 'DAY') + self.assertEqual(table.timePartitioning.field, 'event_time') + self.assertEqual(table.timePartitioning.requirePartitionFilter, True) + self.assertEqual(table.timePartitioning.expirationMs, 2592000000) + hamcrest_assert(p, bq_matcher) + def test_write_with_beam_rows_cdc(self): table = 'write_with_beam_rows_cdc' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 7310bbdc9fb6..7852dd51edd6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2714,6 +2714,8 @@ def expand(self, input): table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS clustering_fields = [] + time_partitioning_config = None + if self.additional_bq_parameters: if callable(self.additional_bq_parameters): raise NotImplementedError( @@ -2721,6 +2723,8 @@ def expand(self, input): "supported for STORAGE_WRITE_API write method.") clustering_fields = ( self.additional_bq_parameters.get("clustering", {}).get("fields", [])) + time_partitioning_config = ( + self.additional_bq_parameters.get("timePartitioning", None)) output = ( input_beam_rows @@ -2738,6 +2742,7 @@ def expand(self, input): use_cdc_writes=self._use_cdc_writes, primary_key=self._primary_key, clustering_fields=clustering_fields, + time_partitioning_config=time_partitioning_config, big_lake_configuration=self._big_lake_configuration, error_handling={ 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 1c536ce319d2..ce9d5993a09f 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,8 +19,50 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2025-06-05 +# Last updated on: 2025-10-14 +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: Option to indicate the output sink's compression type. Default is + NONE. + name: compression + nullable: false + type: str + - description: This option specifies whether and where to output unwritable rows. + name: error_handling + nullable: true + type: Row(output=) + - description: The suffix of each file written, combined with prefix and shardTemplate. + name: filename_suffix + nullable: true + type: str + - description: Maximum number of writers created in a bundle before spilling to + shuffle. + name: max_num_writers_per_bundle + nullable: true + type: int32 + - description: Whether to skip the spilling of data caused by having maxNumWritersPerBundle. + name: no_spilling + nullable: true + type: boolean + - description: The number of shards to use, or 0 for automatic. + name: num_shards + nullable: false + type: int32 + - description: The directory to which files will be written. + name: output_prefix + nullable: false + type: str + - description: The shard template of each file written, combined with prefix and + suffix. + name: shard_template + nullable: true + type: str + identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1 + name: TfrecordWrite - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and @@ -73,45 +115,3 @@ type: boolean identifier: beam:schematransform:org.apache.beam:tfrecord_read:v1 name: TfrecordRead -- default_service: sdks:java:io:expansion-service:shadowJar - description: '' - destinations: - python: apache_beam/io - fields: - - description: Option to indicate the output sink's compression type. Default is - NONE. - name: compression - nullable: false - type: str - - description: This option specifies whether and where to output unwritable rows. - name: error_handling - nullable: true - type: Row(output=) - - description: The suffix of each file written, combined with prefix and shardTemplate. - name: filename_suffix - nullable: true - type: str - - description: Maximum number of writers created in a bundle before spilling to - shuffle. - name: max_num_writers_per_bundle - nullable: true - type: int32 - - description: Whether to skip the spilling of data caused by having maxNumWritersPerBundle. - name: no_spilling - nullable: true - type: boolean - - description: The number of shards to use, or 0 for automatic. - name: num_shards - nullable: false - type: int32 - - description: The directory to which files will be written. - name: output_prefix - nullable: false - type: str - - description: The shard template of each file written, combined with prefix and - suffix. - name: shard_template - nullable: true - type: str - identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1 - name: TfrecordWrite