From 37cfc4e8d4f922a592abd2c87034107d2eb33234 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Feb 2026 13:23:34 -0500 Subject: [PATCH] Remove test-only functions in GcsUtil and change related unit tests to use V1. --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 156 +------------ .../sdk/extensions/gcp/util/GcsUtilTest.java | 207 ++++++++---------- 2 files changed, 100 insertions(+), 263 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 33399ef87b63..396fde452987 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -17,15 +17,10 @@ */ package org.apache.beam.sdk.extensions.gcp.util; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; import com.google.api.gax.paging.Page; -import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; -import com.google.auth.Credentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; @@ -38,9 +33,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; @@ -87,73 +79,31 @@ public static GcsCountersOptions create( public static class GcsUtilFactory implements DefaultValueFactory { @Override public GcsUtil create(PipelineOptions options) { - GcsOptions gcsOptions = options.as(GcsOptions.class); - Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); - return new GcsUtil( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - gcsOptions.getExecutorService(), - ExperimentalOptions.hasExperiment(options, "use_grpc_for_gcs"), - gcsOptions.getGcpCredential(), - gcsOptions.getGcsUploadBufferSizeBytes(), - gcsOptions.getGcsRewriteDataOpBatchLimit(), - GcsCountersOptions.create( - gcsOptions.getEnableBucketReadMetricCounter() - ? gcsOptions.getGcsReadCounterPrefix() - : null, - gcsOptions.getEnableBucketWriteMetricCounter() - ? gcsOptions.getGcsWriteCounterPrefix() - : null), - gcsOptions); + return new GcsUtil(options); } } + /** @deprecated use {@link GcsPath#getNonWildcardPrefix(String)} instead. */ + @Deprecated public static String getNonWildcardPrefix(String globExp) { return GcsPath.getNonWildcardPrefix(globExp); } + /** @deprecated use {@link GcsPath#isWildcard(GcsPath)} instead. */ + @Deprecated public static boolean isWildcard(GcsPath spec) { return GcsPath.isWildcard(spec); } - @VisibleForTesting - GcsUtil( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - Boolean shouldUseGrpc, - Credentials credentials, - @Nullable Integer uploadBufferSizeBytes, - @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions, - GcsOptions gcsOptions) { - this.delegate = - new GcsUtilV1( - storageClient, - httpRequestInitializer, - executorService, - shouldUseGrpc, - credentials, - uploadBufferSizeBytes, - rewriteDataOpBatchLimit, - gcsCountersOptions.delegate, - gcsOptions); - - if (ExperimentalOptions.hasExperiment(gcsOptions, "use_gcsutil_v2")) { - this.delegateV2 = new GcsUtilV2(gcsOptions); + GcsUtil(PipelineOptions options) { + this.delegate = new GcsUtilV1.GcsUtilFactory().create(options); + if (ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2")) { + this.delegateV2 = new GcsUtilV2.GcsUtilFactory().create(options); } else { this.delegateV2 = null; } } - protected void setStorageClient(Storage storageClient) { - delegate.setStorageClient(storageClient); - } - - protected void setBatchRequestSupplier(Supplier supplier) { - delegate.setBatchRequestSupplier(supplier); - } - public List expand(GcsPath gcsPattern) throws IOException { if (delegateV2 != null) { return delegateV2.expand(gcsPattern); @@ -161,12 +111,6 @@ public List expand(GcsPath gcsPattern) throws IOException { return delegate.expand(gcsPattern); } - @VisibleForTesting - @Nullable - Integer getUploadBufferSizeBytes() { - return delegate.getUploadBufferSizeBytes(); - } - public long fileSize(GcsPath path) throws IOException { if (delegateV2 != null) { return delegateV2.fileSize(path); @@ -180,13 +124,6 @@ public StorageObject getObject(GcsPath gcsPath) throws IOException { return delegate.getObject(gcsPath); } - /** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */ - @Deprecated - @VisibleForTesting - StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { - return delegate.getObject(gcsPath, backoff, sleeper); - } - public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws IOException { if (delegateV2 != null) { return delegateV2.getBlob(gcsPath, options); @@ -248,11 +185,6 @@ public Page listBlobs( throw new IOException("GcsUtil V2 not initialized."); } - @VisibleForTesting - List fileSizes(List paths) throws IOException { - return delegate.fileSizes(paths); - } - public SeekableByteChannel open(GcsPath path) throws IOException { return delegate.open(path); } @@ -389,50 +321,6 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException { } } - @VisibleForTesting - boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - return delegate.bucketAccessible(path, backoff, sleeper); - } - - @VisibleForTesting - void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - delegate.verifyBucketAccessible(path, backoff, sleeper); - } - - @VisibleForTesting - @Nullable - Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - return delegate.getBucket(path, backoff, sleeper); - } - - @VisibleForTesting - void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) - throws IOException { - delegate.createBucket(projectId, bucket, backoff, sleeper); - } - - @VisibleForTesting - void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { - delegate.removeBucket(bucket, backoff, sleeper); - } - - @VisibleForTesting - List makeGetBatches( - Collection paths, List results) throws IOException { - List legacyResults = new java.util.ArrayList<>(); - List legacyBatch = delegate.makeGetBatches(paths, legacyResults); - - for (GcsUtilV1.StorageObjectOrIOException[] legacyResult : legacyResults) { - StorageObjectOrIOException[] result = new StorageObjectOrIOException[legacyResult.length]; - for (int i = 0; i < legacyResult.length; ++i) { - result[i] = StorageObjectOrIOException.fromLegacy(legacyResult[i]); - } - results.add(result); - } - - return legacyBatch; - } - public void copy(Iterable srcFilenames, Iterable destFilenames) throws IOException { delegate.copy(srcFilenames, destFilenames); @@ -497,32 +385,6 @@ public void rename( } } - @VisibleForTesting - @SuppressWarnings("JdkObsolete") // for LinkedList - java.util.LinkedList makeRewriteOps( - Iterable srcFilenames, - Iterable destFilenames, - boolean deleteSource, - boolean ignoreMissingSource, - boolean ignoreExistingDest) - throws IOException { - return delegate.makeRewriteOps( - srcFilenames, destFilenames, deleteSource, ignoreMissingSource, ignoreExistingDest); - } - - @VisibleForTesting - @SuppressWarnings("JdkObsolete") // for LinkedList - List makeRewriteBatches( - java.util.LinkedList rewrites) throws IOException { - return delegate.makeRewriteBatches(rewrites); - } - - @VisibleForTesting - List makeRemoveBatches(Collection filenames) - throws IOException { - return delegate.makeRemoveBatches(filenames); - } - public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index 0b02e11eade9..a2b0e0af502b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -151,7 +151,7 @@ public void testCreationWithDefaultOptions() { public void testUploadBufferSizeDefault() { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil util = pipelineOptions.getGcsUtil(); - assertNull(util.getUploadBufferSizeBytes()); + assertNull(util.delegate.getUploadBufferSizeBytes()); } @Test @@ -159,7 +159,7 @@ public void testUploadBufferSizeUserSpecified() { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); pipelineOptions.setGcsUploadBufferSizeBytes(12345); GcsUtil util = pipelineOptions.getGcsUtil(); - assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); + assertEquals((Integer) 12345, util.delegate.getUploadBufferSizeBytes()); } @Test @@ -243,7 +243,7 @@ public void testGlobExpansion() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -332,7 +332,7 @@ public void testRecursiveGlobExpansion() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -383,7 +383,7 @@ public void testNonExistentObjectReturnsEmptyResult() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -408,7 +408,7 @@ public void testAccessDeniedObjectThrowsIOException() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -436,7 +436,7 @@ public void testFileSizeNonBatch() throws Exception { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -461,7 +461,7 @@ public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.delegate.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); thrown.expect(FileNotFoundException.class); gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); @@ -473,7 +473,7 @@ public void testRetryFileSizeNonBatch() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); @@ -491,6 +491,7 @@ public void testRetryFileSizeNonBatch() throws IOException { assertEquals( 1000, gcsUtil + .delegate .getObject( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, @@ -546,8 +547,8 @@ public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - gcsUtil.fileSizes( + gcsUtil.delegate.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.delegate.fileSizes( ImmutableList.of( GcsPath.fromComponents("testbucket", "testobject"), GcsPath.fromComponents("testbucket", "testobject2"))); @@ -567,8 +568,9 @@ public void testGetSizeBytesWhenFileNotFoundNoBatch() throws Exception { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + gcsUtil.delegate.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.delegate.fileSizes( + ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); } @Test @@ -637,9 +639,9 @@ public LowLevelHttpResponse execute() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient( + gcsUtil.delegate.setStorageClient( new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); - gcsUtil.fileSizes( + gcsUtil.delegate.fileSizes( ImmutableList.of( GcsPath.fromComponents("testbucket", "testobject"), GcsPath.fromComponents("testbucket", "testobject2"))); @@ -678,9 +680,10 @@ public LowLevelHttpResponse execute() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient( + gcsUtil.delegate.setStorageClient( new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + gcsUtil.delegate.fileSizes( + ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); } @Test @@ -725,7 +728,7 @@ public LowLevelHttpResponse execute() throws IOException { new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient( + gcsUtil.delegate.setStorageClient( new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); gcsUtil.remove(Arrays.asList("gs://some-bucket/already-deleted")); } @@ -737,7 +740,7 @@ public void testCreateBucket() throws IOException { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); @@ -750,7 +753,8 @@ public void testCreateBucket() throws IOException { .thenThrow(new SocketTimeoutException("SocketException")) .thenReturn(new Bucket()); - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep); + gcsUtil.delegate.createBucket( + "a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep); } @Test @@ -759,7 +763,7 @@ public void testCreateBucketAccessErrors() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); @@ -778,7 +782,8 @@ public void testCreateBucketAccessErrors() throws IOException { thrown.expect(AccessDeniedException.class); - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep); + gcsUtil.delegate.createBucket( + "a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep); } @Test @@ -787,7 +792,7 @@ public void testBucketAccessible() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -801,7 +806,7 @@ public void testBucketAccessible() throws IOException { .thenReturn(new Bucket()); assertTrue( - gcsUtil.bucketAccessible( + gcsUtil.delegate.bucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep)); @@ -813,7 +818,7 @@ public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -830,7 +835,7 @@ public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { when(mockStorageGet.execute()).thenThrow(expectedException); assertFalse( - gcsUtil.bucketAccessible( + gcsUtil.delegate.bucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep)); @@ -842,7 +847,7 @@ public void testBucketDoesNotExist() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -857,7 +862,7 @@ public void testBucketDoesNotExist() throws IOException { HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see")); assertFalse( - gcsUtil.bucketAccessible( + gcsUtil.delegate.bucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep)); @@ -869,7 +874,7 @@ public void testVerifyBucketAccessible() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -882,7 +887,7 @@ public void testVerifyBucketAccessible() throws IOException { .thenThrow(new SocketTimeoutException("SocketException")) .thenReturn(new Bucket()); - gcsUtil.verifyBucketAccessible( + gcsUtil.delegate.verifyBucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep); @@ -894,7 +899,7 @@ public void testVerifyBucketAccessibleAccessError() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -910,7 +915,7 @@ public void testVerifyBucketAccessibleAccessError() throws IOException { when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); when(mockStorageGet.execute()).thenThrow(expectedException); - gcsUtil.verifyBucketAccessible( + gcsUtil.delegate.verifyBucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep); @@ -922,7 +927,7 @@ public void testVerifyBucketAccessibleDoesNotExist() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -936,7 +941,7 @@ public void testVerifyBucketAccessibleDoesNotExist() throws IOException { googleJsonResponseException( HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see")); - gcsUtil.verifyBucketAccessible( + gcsUtil.delegate.verifyBucketAccessible( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep); @@ -948,7 +953,7 @@ public void testGetBucket() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -962,7 +967,7 @@ public void testGetBucket() throws IOException { .thenReturn(new Bucket()); assertNotNull( - gcsUtil.getBucket( + gcsUtil.delegate.getBucket( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep)); @@ -974,7 +979,7 @@ public void testGetBucketNotExists() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); @@ -990,7 +995,7 @@ public void testGetBucketNotExists() throws IOException { thrown.expect(FileNotFoundException.class); thrown.expectMessage("It don't exist"); - gcsUtil.getBucket( + gcsUtil.delegate.getBucket( GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper()::sleep); @@ -1147,7 +1152,8 @@ public void testMakeRewriteOps() throws IOException { GcsUtil gcsUtil = gcsOptions.getGcsUtil(); LinkedList rewrites = - gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1), false, false, false); + gcsUtil.delegate.makeRewriteOps( + makeStrings("s", 1), makeStrings("d", 1), false, false, false); assertEquals(1, rewrites.size()); RewriteOp rewrite = rewrites.pop(); @@ -1167,7 +1173,8 @@ public void testMakeRewriteOpsWithOptions() throws IOException { gcsUtil.delegate.maxBytesRewrittenPerCall = 1337L; LinkedList rewrites = - gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1), false, false, false); + gcsUtil.delegate.makeRewriteOps( + makeStrings("s", 1), makeStrings("d", 1), false, false, false); assertEquals(1, rewrites.size()); RewriteOp rewrite = rewrites.pop(); @@ -1182,23 +1189,24 @@ public void testMakeRewriteBatches() throws IOException { // Small number of files fits in 1 batch List batches = - gcsUtil.makeRewriteBatches( - gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3), false, false, false)); + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps( + makeStrings("s", 3), makeStrings("d", 3), false, false, false)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); // 1 batch of files fits in 1 batch batches = - gcsUtil.makeRewriteBatches( - gcsUtil.makeRewriteOps( + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps( makeStrings("s", 100), makeStrings("d", 100), false, false, false)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(100)); // A little more than 5 batches of files fits in 6 batches batches = - gcsUtil.makeRewriteBatches( - gcsUtil.makeRewriteOps( + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps( makeStrings("s", 501), makeStrings("d", 501), false, false, false)); assertThat(batches.size(), equalTo(6)); assertThat(sumBatchSizes(batches), equalTo(501)); @@ -1212,15 +1220,16 @@ public void testMakeRewriteBatchesWithLowerDataOpLimit() throws IOException { // Small number of files in same bucket fits in 1 batch List batches = - gcsUtil.makeRewriteBatches( - gcsUtil.makeRewriteOps(makeStrings("s", 5), makeStrings("d", 5), false, false, false)); + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps( + makeStrings("s", 5), makeStrings("d", 5), false, false, false)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(5)); // Files copying between buckets use smaller batch size batches = - gcsUtil.makeRewriteBatches( - gcsUtil.makeRewriteOps( + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps( makeStrings("bucket1", "s", 5), makeStrings("bucket2", "d", 5), false, @@ -1240,7 +1249,8 @@ public void testMakeRewriteBatchesWithLowerDataOpLimit() throws IOException { toFiles.addAll(makeStrings("bucket5", "g", 1)); batches = - gcsUtil.makeRewriteBatches(gcsUtil.makeRewriteOps(fromFiles, toFiles, false, false, false)); + gcsUtil.delegate.makeRewriteBatches( + gcsUtil.delegate.makeRewriteOps(fromFiles, toFiles, false, false, false)); assertThat(batches.size(), equalTo(4)); assertThat(batches.get(0).size(), equalTo(91)); assertThat(sumBatchSizes(batches), equalTo(97)); @@ -1252,7 +1262,7 @@ public void testMakeRewriteOpsInvalid() throws IOException { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Number of source files 3"); - gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1), false, false, false); + gcsUtil.delegate.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1), false, false, false); } private class FakeBatcher implements BatchInterface { @@ -1318,8 +1328,8 @@ public void testRename() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class); @@ -1350,8 +1360,8 @@ public void testRenameIgnoringMissing() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Rewrite mockStorageRewrite1 = Mockito.mock(Storage.Objects.Rewrite.class); @@ -1380,8 +1390,8 @@ public void testRenamePropagateMissingException() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class); @@ -1401,8 +1411,8 @@ public void testRenameSkipDestinationExistsSameBucket() throws IOException { GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class); @@ -1426,7 +1436,7 @@ public void testRenameSkipDestinationExistsDifferentBucket() throws IOException GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); + gcsUtil.delegate.setStorageClient(mockStorage); assertThrows( UnsupportedOperationException.class, @@ -1444,8 +1454,8 @@ public void testThrowRetentionPolicyNotMetErrorWhenUnequalChecksum() throws IOEx GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockGetRequest1 = Mockito.mock(Storage.Objects.Get.class); @@ -1476,8 +1486,8 @@ public void testIgnoreRetentionPolicyNotMetErrorWhenEqualChecksum() throws IOExc GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class); @@ -1518,17 +1528,17 @@ public void testMakeRemoveBatches() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); + List batches = gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 3)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); // 1 batch of files fits in 1 batch - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); + batches = gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 100)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(100)); // A little more than 5 batches of files fits in 6 batches - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); + batches = gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 501)); assertThat(batches.size(), equalTo(6)); assertThat(sumBatchSizes(batches), equalTo(501)); } @@ -1538,22 +1548,22 @@ public void testMakeGetBatches() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List results = Lists.newArrayList(); - List batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); + List results = Lists.newArrayList(); + List batches = gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 3), results); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); assertEquals(3, results.size()); // 1 batch of files fits in 1 batch results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); + batches = gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 100), results); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(100)); assertEquals(100, results.size()); // A little more than 5 batches of files fits in 6 batches results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); + batches = gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 501), results); assertThat(batches.size(), equalTo(6)); assertThat(sumBatchSizes(batches), equalTo(501)); assertEquals(501, results.size()); @@ -1564,8 +1574,8 @@ public void testGetObjects() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class); @@ -1584,8 +1594,8 @@ public void testGetObjectsWithException() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockGetRequest = Mockito.mock(Storage.Objects.Get.class); @@ -1610,8 +1620,8 @@ public void testListObjectsException() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - gcsUtil.setBatchRequestSupplier(FakeBatcher::new); + gcsUtil.delegate.setStorageClient(mockStorage); + gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new); Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); when(mockStorage.objects()).thenReturn(mockStorageObjects); @@ -1638,46 +1648,11 @@ public static GcsUtilMock createMockWithMockStorage(PipelineOptions options, byt } public static GcsUtilMock createMock(PipelineOptions options) { - GcsOptions gcsOptions = options.as(GcsOptions.class); - Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); - return new GcsUtilMock( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - gcsOptions.getExecutorService(), - hasExperiment(options, "use_grpc_for_gcs"), - gcsOptions.getGcpCredential(), - gcsOptions.getGcsUploadBufferSizeBytes(), - gcsOptions.getGcsRewriteDataOpBatchLimit(), - GcsUtil.GcsCountersOptions.create( - gcsOptions.getEnableBucketReadMetricCounter() - ? gcsOptions.getGcsReadCounterPrefix() - : null, - gcsOptions.getEnableBucketWriteMetricCounter() - ? gcsOptions.getGcsWriteCounterPrefix() - : null), - gcsOptions); + return new GcsUtilMock(options); } - private GcsUtilMock( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - Boolean shouldUseGrpc, - Credentials credentials, - @Nullable Integer uploadBufferSizeBytes, - @Nullable Integer rewriteDataOpBatchLimit, - GcsUtil.GcsCountersOptions gcsCountersOptions, - GcsOptions gcsOptions) { - super( - storageClient, - httpRequestInitializer, - executorService, - shouldUseGrpc, - credentials, - uploadBufferSizeBytes, - rewriteDataOpBatchLimit, - gcsCountersOptions, - gcsOptions); + private GcsUtilMock(PipelineOptions options) { + super(options); } }