From 18f8ae8597bca4f500fb380c5eb435f1fd724e80 Mon Sep 17 00:00:00 2001 From: bambadiouf1 Date: Wed, 21 Jan 2026 12:38:35 -0800 Subject: [PATCH] Add DiskProvisionedIops and DiskProvisionedThroughputMibps pipeline options for python java and go sdks --- .../dataflow/DataflowPipelineTranslator.java | 6 +++++ .../DataflowPipelineWorkerPoolOptions.java | 11 +++++++++ .../DataflowPipelineTranslatorTest.java | 24 +++++++++++++++++++ sdks/go/pkg/beam/runners/dataflow/dataflow.go | 6 +++++ .../beam/runners/dataflow/dataflowlib/job.go | 4 ++++ .../apache_beam/options/pipeline_options.py | 20 ++++++++++++++++ .../options/pipeline_options_test.py | 6 +++++ .../runners/dataflow/internal/apiclient.py | 6 +++++ .../dataflow/internal/apiclient_test.py | 20 ++++++++++++++++ .../dataflow/dataflow_v1b3_messages.py | 13 ++++++++++ 10 files changed, 116 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 08d84705c5c7..e5c864dfb370 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -405,6 +405,12 @@ public Job translate(List packages) { if (options.getDiskSizeGb() > 0) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } + if (options.getDiskProvisionedIops() != null) { + workerPool.setDiskProvisionedIops(options.getDiskProvisionedIops()); + } + if (options.getDiskProvisionedThroughputMibps() != null) { + workerPool.setDiskProvisionedThroughputMibps(options.getDiskProvisionedThroughputMibps()); + } AutoscalingSettings settings = new AutoscalingSettings(); if (options.getAutoscalingAlgorithm() != null) { settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm()); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index fd4af6d5e043..31c5f0bcd830 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -193,6 +193,17 @@ public String getAlgorithm() { void setWorkerDiskType(String value); +@Description("IOPS provisioned for the root disk for VMs. If zero or " + + "unspecified, the service will attempt to choose a reasonable default.") +Long getDiskProvisionedIops(); +void setDiskProvisionedIops(Long diskProvisionedIops); + +@Description("Throughput provisioned in MiB/s for the root disk for VMs. If zero or " + + "unspecified, the service will attempt to choose a reasonable default.") +Long getDiskProvisionedThroughputMibps(); +void setDiskProvisionedThroughputMibps(Long diskProvisionedThroughputMibps); + + /** * Specifies whether worker pools should be started with public IP addresses. * diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8226dc2c7274..39d32d814f7d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -619,6 +619,30 @@ public void testDiskSizeGbConfig() throws IOException { assertEquals(diskSizeGb, job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb()); } + @Test + public void testDiskProvisioningTranslation() { + DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); + options.setDiskProvisionedIops(Long.valueOf(7000)); + options.setDiskProvisionedThroughputMibps(Long.valueOf(250)); + options.setProject("test-project"); // Required for translator + + WorkerPool pool = translateWorkerPool(options); + + assertEquals(Long.valueOf(7000), pool.getDiskProvisionedIops()); + assertEquals(Long.valueOf(250), pool.getDiskProvisionedThroughputMibps()); + } + + @Test + public void testDiskProvisioningTranslationDefaults() { + DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); + options.setProject("test-project"); // Required for translator + + WorkerPool pool = translateWorkerPool(options); + + assertNull(pool.getDiskProvisionedIops()); + assertNull(pool.getDiskProvisionedThroughputMibps()); + } + /** A composite transform that returns an output that is unrelated to the input. */ private static class UnrelatedOutputCreator extends PTransform, PCollection> { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 101441dbcb56..52e70ae5fc6f 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -63,6 +63,8 @@ var ( maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).") diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).") diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).") + diskProvisionedIOPS = flag.Int64("disk_provisioned_iops", 0, "Provisioned IOPS for the root disk for VMs (optional).") + diskProvisionedThroughputMibps = flag.Int64("disk_provisioned_throughput_mibps", 0, "Provisioned throughput for the root disk for VMs (optional).") autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).") zone = flag.String("zone", "", "GCP zone (optional)") kmsKey = flag.String("dataflow_kms_key", "", "The Cloud KMS key identifier used to encrypt data at rest (optional).") @@ -115,6 +117,8 @@ var flagFilter = map[string]bool{ "max_num_workers": true, "disk_size_gb": true, "disk_type": true, + "disk_provisioned_iops": true, + "disk_provisioned_throughput_mibps": true, "autoscaling_algorithm": true, "zone": true, "network": true, @@ -396,6 +400,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions WorkerHarnessThreads: *workerHarnessThreads, DiskSizeGb: *diskSizeGb, DiskType: *diskType, + DiskProvisionedIOPS: *diskProvisionedIOPS, + DiskProvisionedThroughputMibps: *diskProvisionedThroughputMibps, Algorithm: *autoscalingAlgorithm, FlexRSGoal: *flexRSGoal, MachineType: *firstNonEmpty(workerMachineType, machineType), diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 96c0750d18e3..735e069318a8 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -58,6 +58,8 @@ type JobOptions struct { NumWorkers int64 DiskSizeGb int64 DiskType string + DiskProvisionedIOPS int64 + DiskProvisionedThroughputMibps int64 MachineType string Labels map[string]string ServiceAccountEmail string @@ -191,6 +193,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker }, DiskSizeGb: opts.DiskSizeGb, DiskType: opts.DiskType, + DiskProvisionedIops: opts.DiskProvisionedIOPS, + DiskProvisionedThroughputMibps: opts.DiskProvisionedThroughputMibps, IpConfiguration: ipConfiguration, Kind: "harness", Packages: packages, diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0e1012b2de65..a4b62a7f6f0c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1386,6 +1386,26 @@ def _add_argparse_args(cls, parser): dest='disk_type', default=None, help=('Specifies what type of persistent disk should be used.')) + parser.add_argument( + '--disk_provisioned_iops', + type=int, + default=None, + dest='disk_provisioned_iops', + help=( + 'The provisioned IOPS of the disk. If not set, the Dataflow service' + ' will choose a reasonable default.' + ), + ) + parser.add_argument( + '--disk_provisioned_throughput_mibps', + type=int, + default=None, + dest='disk_provisioned_throughput_mibps', + help=( + 'The provisioned throughput of the disk in MiB/s. If not set, the' + ' Dataflow service will choose a reasonable default.' + ), + ) parser.add_argument( '--worker_region', default=None, diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index c683c9625272..3e357025d914 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -444,12 +444,18 @@ def test_worker_options(self): 'abc', '--disk_type', 'def', + '--disk_provisioned_iops', + '4000', + '--disk_provisioned_throughput_mibps', + '200', '--element_processing_timeout_minutes', '10', ]) worker_options = options.view_as(WorkerOptions) self.assertEqual(worker_options.machine_type, 'abc') self.assertEqual(worker_options.disk_type, 'def') + self.assertEqual(worker_options.disk_provisioned_iops, 4000) + self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200) self.assertEqual(worker_options.element_processing_timeout_minutes, 10) options = PipelineOptions( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 164ace532b23..c749402ae7ff 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -205,6 +205,12 @@ def __init__( pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type + if self.worker_options.disk_provisioned_iops: + pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops + if self.worker_options.disk_provisioned_throughput_mibps: + pool.diskProvisionedThroughputMibps = ( + self.worker_options.disk_provisioned_throughput_mibps + ) if self.worker_options.zone: pool.zone = self.worker_options.zone if self.worker_options.network: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b767cef86b2e..f346887029f3 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -128,6 +128,26 @@ def test_set_subnetwork(self): env.proto.workerPools[0].subnetwork, '/regions/MY/subnetworks/SUBNETWORK') + def test_set_disk_provisioning_options(self): + pipeline_options = PipelineOptions([ + '--disk_provisioned_iops', + '4000', + '--disk_provisioned_throughput_mibps', + '200', + '--temp_location', + 'gs://any-location/temp', + ]) + env = apiclient.Environment( + [], # packages + pipeline_options, + '2.0.0', # any environment version + FAKE_PIPELINE_URL, + ) + self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000) + self.assertEqual( + env.proto.workerPools[0].diskProvisionedThroughputMibps, 200 + ) + def test_flexrs_blank(self): pipeline_options = PipelineOptions( ['--temp_location', 'gs://any-location/temp']) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index c0bbfa74ac1e..dc648c7dfe4b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -2583,6 +2583,11 @@ class FlexTemplateRuntimeEnvironment(_messages.Message): zone](https://cloud.google.com/compute/docs/regions-zones/regions-zones) for launching worker instances to run your pipeline. In the future, worker_zone will take precedence. + diskProvisionedIops: Provisioned IOPS for the root disk for VMs. If zero or + unspecified, the service will attempt to choose a reasonable default. + diskProvisionedThroughputMibps: Provisioned throughput for the root disk + for VMs. If zero or unspecified, the service will attempt to choose a + reasonable default. """ class AutoscalingAlgorithmValueValuesEnum(_messages.Enum): r"""The algorithm to use for autoscaling @@ -2701,6 +2706,10 @@ class AdditionalProperty(_messages.Message): workerRegion = _messages.StringField(23) workerZone = _messages.StringField(24) zone = _messages.StringField(25) + diskProvisionedIops = _messages.IntegerField( + 26, variant=_messages.Variant.INT64) + diskProvisionedThroughputMibps = _messages.IntegerField( + 27, variant=_messages.Variant.INT64) class FloatingPointList(_messages.Message): @@ -7720,6 +7729,10 @@ class AdditionalProperty(_messages.Message): teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20) workerHarnessContainerImage = _messages.StringField(21) zone = _messages.StringField(22) + diskProvisionedIops = _messages.IntegerField( + 23, variant=_messages.Variant.INT64) + diskProvisionedThroughputMibps = _messages.IntegerField( + 24, variant=_messages.Variant.INT64) class WorkerSettings(_messages.Message):