From 8e422a17b977ea8541f4b4052b17a6ddda2b319d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 10 Feb 2026 10:55:40 -0500 Subject: [PATCH 01/11] Add copy() and remove() for GcsUtil V2. --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 20 ++++ .../sdk/extensions/gcp/util/GcsUtilV2.java | 105 ++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index a2fdf24e9fbc..f37e512f4531 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -30,6 +30,7 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -40,7 +41,9 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV1.StorageObjectOrIOException; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.CopyStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -433,6 +436,15 @@ public void copy(Iterable srcFilenames, Iterable destFilenames) delegate.copy(srcFilenames, destFilenames); } + public void copy(Iterable srcPaths, Iterable dstPaths, CopyStrategy strategy) + throws IOException { + if (delegateV2 != null) { + delegateV2.copy(srcPaths, dstPaths, strategy); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + public void rename( Iterable srcFilenames, Iterable destFilenames, MoveOptions... moveOptions) throws IOException { @@ -469,6 +481,14 @@ public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } + public void remove(Iterable paths, BlobSourceOption... options) throws IOException { + if (delegateV2 != null) { + delegateV2.remove(paths, options); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION") public static class StorageObjectOrIOException { final GcsUtilV1.StorageObjectOrIOException delegate; diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index a2df45511c95..7de21e314aec 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -18,19 +18,25 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.CopyWriter; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; +import com.google.cloud.storage.Storage.CopyRequest; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; @@ -71,6 +77,12 @@ public GcsUtilV2 create(PipelineOptions options) { /** Maximum number of requests permitted in a GCS batch request. */ private static final int MAX_REQUESTS_PER_BATCH = 100; + /** + * Limit the number of bytes Cloud Storage will attempt to copy before responding to an individual + * request. If you see Read Timeout errors, try reducing this value. + */ + private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L; + GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); @@ -259,6 +271,99 @@ public List expand(GcsPath gcsPattern) throws IOException { return results; } + public void remove(Iterable paths, BlobSourceOption... options) throws IOException { + for (List pathPartition : + Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { + + // Create a new empty batch every time + StorageBatch batch = storage.batch(); + List> batchResultFutures = new ArrayList<>(); + + for (GcsPath path : pathPartition) { + batchResultFutures.add(batch.delete(path.getBucket(), path.getObject(), options)); + } + batch.submit(); + + for (int i = 0; i < batchResultFutures.size(); i++) { + StorageBatchResult future = batchResultFutures.get(i); + try { + Boolean deleted = future.get(); + if (!deleted) { + throw new FileNotFoundException( + String.format( + "The specified file does not exist: %s", pathPartition.get(i).toString())); + } + } catch (StorageException e) { + throw translateStorageException(pathPartition.get(i), e); + } + } + } + } + + public enum CopyStrategy { + NO_OVERWRITE, // Fail if target exists + SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic) + UNSAFE_OVERWRITE // Overwrite regardless of state + } + + public void copy(Iterable srcPaths, Iterable dstPaths, CopyStrategy strategy) + throws IOException { + List srcList = Lists.newArrayList(srcPaths); + List dstList = Lists.newArrayList(dstPaths); + checkArgument( + srcList.size() == dstList.size(), + "Number of source files %s must equal number of destination files %s", + srcList.size(), + dstList.size()); + + for (int i = 0; i < srcList.size(); i++) { + GcsPath srcPath = srcList.get(i); + GcsPath dstPath = dstList.get(i); + BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject()); + BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject()); + + CopyRequest.Builder copyRequestBuilder = + CopyRequest.newBuilder() + .setSource(srcId) + .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK); // 2GiB + + if (strategy == CopyStrategy.UNSAFE_OVERWRITE) { + copyRequestBuilder.setTarget(dstId); + } else { + // Both NO_OVERWRITE and SAFE_OVERWRITE require checking the existing blob + BlobInfo existingTarget = null; + try { + existingTarget = storage.get(dstId); + } catch (StorageException e) { + throw translateStorageException(dstPath, e); + } + + Storage.BlobTargetOption precondition; + if (existingTarget == null) { + precondition = Storage.BlobTargetOption.doesNotExist(); + } else if (strategy == CopyStrategy.SAFE_OVERWRITE) { + // SAFE_OVERWRITE: match the generation of the existing object + precondition = Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()); + } else { + // Target exists and we aren't allowed to overwrite (strategy == NO_OVERWRITE) + throw new FileAlreadyExistsException( + srcPath.toString(), + dstPath.toString(), + "Target object already exists and CopyStrategy is NO_OVERWRITE"); + } + copyRequestBuilder.setTarget(dstId, precondition); + } + + CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); + + try { + copyWriter.getResult(); + } catch (StorageException e) { + throw translateStorageException(srcPath, e); + } + } + } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { String bucketName = path.getBucket(); From fcc571eb1ba101b7a4ae6c851583e51cd6cb5bd9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 10 Feb 2026 15:24:16 -0500 Subject: [PATCH 02/11] Add tests and modify copy and remove to take strategies --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 11 +- .../sdk/extensions/gcp/util/GcsUtilV2.java | 37 +++-- .../gcp/util/GcsUtilParameterizedIT.java | 155 ++++++++++++++++++ .../gcp/util/gcsfs/GcsPathTest.java | 2 + 4 files changed, 187 insertions(+), 18 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index f37e512f4531..4fd9de95eeea 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -30,7 +30,6 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; -import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -43,7 +42,8 @@ import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV1.StorageObjectOrIOException; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult; -import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.CopyStrategy; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -436,7 +436,8 @@ public void copy(Iterable srcFilenames, Iterable destFilenames) delegate.copy(srcFilenames, destFilenames); } - public void copy(Iterable srcPaths, Iterable dstPaths, CopyStrategy strategy) + public void copy( + Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) throws IOException { if (delegateV2 != null) { delegateV2.copy(srcPaths, dstPaths, strategy); @@ -481,9 +482,9 @@ public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } - public void remove(Iterable paths, BlobSourceOption... options) throws IOException { + public void remove(Iterable paths, MissingStrategy strategy) throws IOException { if (delegateV2 != null) { - delegateV2.remove(paths, options); + delegateV2.remove(paths, strategy); } else { throw new IOException("GcsUtil V2 not initialized."); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 7de21e314aec..3429ea93b98e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -33,7 +33,6 @@ import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; -import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -89,12 +88,15 @@ public GcsUtilV2 create(PipelineOptions options) { } @SuppressWarnings({ - "nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null. + "nullness" // For Creating AccessDeniedException FileNotFoundException, and + // FileAlreadyExistsException with null. }) private IOException translateStorageException(GcsPath gcsPath, StorageException e) { switch (e.getCode()) { case 403: return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); + case 404: + return new FileNotFoundException(e.getMessage()); case 409: return new FileAlreadyExistsException(gcsPath.toString(), null, e.getMessage()); default: @@ -271,7 +273,12 @@ public List expand(GcsPath gcsPattern) throws IOException { return results; } - public void remove(Iterable paths, BlobSourceOption... options) throws IOException { + public enum MissingStrategy { + IGNORE_MISSING_TARGET, + FAIL_ON_MISSING_TARGET, + } + + public void remove(Iterable paths, MissingStrategy strategy) throws IOException { for (List pathPartition : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { @@ -280,7 +287,7 @@ public void remove(Iterable paths, BlobSourceOption... options) throws List> batchResultFutures = new ArrayList<>(); for (GcsPath path : pathPartition) { - batchResultFutures.add(batch.delete(path.getBucket(), path.getObject(), options)); + batchResultFutures.add(batch.delete(path.getBucket(), path.getObject())); } batch.submit(); @@ -289,9 +296,13 @@ public void remove(Iterable paths, BlobSourceOption... options) throws try { Boolean deleted = future.get(); if (!deleted) { - throw new FileNotFoundException( - String.format( - "The specified file does not exist: %s", pathPartition.get(i).toString())); + if (strategy == MissingStrategy.FAIL_ON_MISSING_TARGET) { + throw new FileNotFoundException( + String.format( + "The specified file does not exist: %s", pathPartition.get(i).toString())); + } else { + LOG.warn("Ignoring failed deletion on file {}.", pathPartition.get(i).toString()); + } } } catch (StorageException e) { throw translateStorageException(pathPartition.get(i), e); @@ -300,13 +311,14 @@ public void remove(Iterable paths, BlobSourceOption... options) throws } } - public enum CopyStrategy { + public enum OverwriteStrategy { NO_OVERWRITE, // Fail if target exists SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic) UNSAFE_OVERWRITE // Overwrite regardless of state } - public void copy(Iterable srcPaths, Iterable dstPaths, CopyStrategy strategy) + public void copy( + Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) throws IOException { List srcList = Lists.newArrayList(srcPaths); List dstList = Lists.newArrayList(dstPaths); @@ -327,7 +339,7 @@ public void copy(Iterable srcPaths, Iterable dstPaths, CopyStr .setSource(srcId) .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK); // 2GiB - if (strategy == CopyStrategy.UNSAFE_OVERWRITE) { + if (strategy == OverwriteStrategy.UNSAFE_OVERWRITE) { copyRequestBuilder.setTarget(dstId); } else { // Both NO_OVERWRITE and SAFE_OVERWRITE require checking the existing blob @@ -341,7 +353,7 @@ public void copy(Iterable srcPaths, Iterable dstPaths, CopyStr Storage.BlobTargetOption precondition; if (existingTarget == null) { precondition = Storage.BlobTargetOption.doesNotExist(); - } else if (strategy == CopyStrategy.SAFE_OVERWRITE) { + } else if (strategy == OverwriteStrategy.SAFE_OVERWRITE) { // SAFE_OVERWRITE: match the generation of the existing object precondition = Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()); } else { @@ -354,9 +366,8 @@ public void copy(Iterable srcPaths, Iterable dstPaths, CopyStr copyRequestBuilder.setTarget(dstId, precondition); } - CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); - try { + CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); copyWriter.getResult(); } catch (StorageException e) { throw translateStorageException(srcPath, e); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index db5097c95155..dab41ef4843a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.testing.TestPipeline; @@ -297,4 +299,157 @@ public void testCreateAndRemoveBucket() throws IOException { } } } + + @Test + public void testCopyAndRemove() throws IOException { + String existingBucket = "apache-beam-temp-bucket-12345"; + List srcPaths = + Arrays.asList( + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")); + + // create a bucket first + try { + createBucketHelper(existingBucket); + if (experiment.equals("use_gcsutil_v2")) { + testCopyAndRemoveV2(existingBucket, srcPaths); + } else { + testCopyAndRemoveV1(existingBucket, srcPaths); + } + } catch (IOException e) { + throw e; + } finally { + tearDownBucketHelper(existingBucket); + } + } + + private void testCopyAndRemoveV2(String existingBucket, List srcPaths) + throws IOException { + String nonExistentBucket = "my-random-test-bucket-12345"; + List dstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, o.getObject())) + .collect(Collectors.toList()); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // copy from existing files to an existing bucket + // No exception on SAFE_OVERWRITE and UNSAFE_OVERWRITE + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.UNSAFE_OVERWRITE); + + // raise exception on NO_OVERWRITE + assertThrows( + FileAlreadyExistsException.class, + () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.NO_OVERWRITE)); + + // remove the existing files + gcsUtil.remove(dstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + + // remove the already-deleted files + // No exception on IGNORE_MISSING_TARGET + gcsUtil.remove(dstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + + // raise exception on FAIL_ON_MISSING_TARGET + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.remove(dstPaths, MissingStrategy.FAIL_ON_MISSING_TARGET)); + + final List wrongDstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + // copy from existing files to an nonexistent bucket. Raise exception. + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.copy(srcPaths, wrongDstPaths, OverwriteStrategy.SAFE_OVERWRITE)); + + // remove files from an nonexistent bucket. No exception. + gcsUtil.remove(wrongDstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + + final List wrongSrcPaths = + Arrays.asList(GcsPath.fromUri("gs://apache-beam-samples/shakespeare/some-random-name.txt")); + + // missing source file. Raise exception. + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.copy(wrongSrcPaths, wrongSrcPaths, OverwriteStrategy.SAFE_OVERWRITE)); + } + + private void testCopyAndRemoveV1(String existingBucket, List srcPaths) + throws IOException { + String nonExistentBucket = "my-random-test-bucket-12345"; + List srcList = srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // copy from existing files to an existing bucket + List dstList = + srcPaths.stream() + .map(o -> String.format("gs://%s/%s", existingBucket, o.getObject())) + .collect(Collectors.toList()); + gcsUtil.copy(srcList, dstList); + + assertExists(GcsPath.fromUri(dstList.get(0))); + assertExists(GcsPath.fromUri(dstList.get(1))); + + // copy from existing files to an existing bucket, but target files exist. No exception. + gcsUtil.copy(srcList, dstList); + + // remove the existing files + gcsUtil.remove(dstList); + + // remove the already-deleted files. No exception. + gcsUtil.remove(dstList); + + final List wrongDstList = + srcPaths.stream() + .map(o -> String.format("gs://%s/%s", nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + // copy from existing files to an nonexistent bucket. Raise exception. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList, wrongDstList)); + + // remove files from an nonexistent bucket. No exception. + gcsUtil.remove(dstList); + + final List wrongSrcList = + Arrays.asList("gs://apache-beam-samples/shakespeare/some-random-name.txt"); + + // missing source file, Raise exception + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(wrongSrcList, wrongSrcList)); + } + + private void createBucketHelper(String bucketName) throws IOException { + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.createBucket(BucketInfo.of(bucketName)); + } else { + GcsOptions gcsOptions = options.as(GcsOptions.class); + gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); + } + } + + private void tearDownBucketHelper(String bucketName) { + try { + // Use "**" in the pattern to match any characters including "/". + List paths = + gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", bucketName))); + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.remove(paths, MissingStrategy.IGNORE_MISSING_TARGET); + gcsUtil.removeBucket(BucketInfo.of(bucketName)); + } else { + gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList())); + gcsUtil.removeBucket(new Bucket().setName(bucketName)); + } + } catch (IOException e) { + } + } + + private void assertExists(GcsPath path) throws IOException { + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.getBlob(path); + } else { + gcsUtil.getObject(path); + } + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java index f344c6c2dba7..9512fec312cf 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java @@ -351,6 +351,7 @@ public void testSubPathError() { @Test public void testIsWildcard() { + assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/*"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]"))); @@ -359,6 +360,7 @@ public void testIsWildcard() { @Test public void testGetNonWildcardPrefix() { + assertEquals("gs://bucket/", GcsPath.getNonWildcardPrefix("gs://bucket/*")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo*")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo?")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]")); From 959b573a0d5bfcdeabed84a1bdbb2a1c912b74cf Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 10 Feb 2026 16:02:14 -0500 Subject: [PATCH 03/11] Add deprecated annotations to V1 copy and remove. --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 15 +++++++++++++++ .../gcp/util/GcsUtilParameterizedIT.java | 14 ++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 4fd9de95eeea..2d7035e1a507 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -431,11 +431,20 @@ List makeGetBatches( return legacyBatch; } + /** + * @deprecated use {@link #copyV2(Iterable, Iterable)} or {@link #copy(Iterable, Iterable, + * OverwriteStrategy)}. + */ + @Deprecated public void copy(Iterable srcFilenames, Iterable destFilenames) throws IOException { delegate.copy(srcFilenames, destFilenames); } + public void copyV2(Iterable srcPaths, Iterable dstPaths) throws IOException { + copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + } + public void copy( Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) throws IOException { @@ -478,10 +487,16 @@ List makeRemoveBatches(Collection filenames) return delegate.makeRemoveBatches(filenames); } + /** @deprecated use {@link #removeV2(Iterable)} or {@link #remove(Iterable, MissingStrategy)}. */ + @Deprecated public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } + public void removeV2(Iterable paths) throws IOException { + remove(paths, MissingStrategy.IGNORE_MISSING_TARGET); + } + public void remove(Iterable paths, MissingStrategy strategy) throws IOException { if (delegateV2 != null) { delegateV2.remove(paths, strategy); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index dab41ef4843a..1208ba027353 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -330,13 +330,13 @@ private void testCopyAndRemoveV2(String existingBucket, List srcPaths) srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, o.getObject())) .collect(Collectors.toList()); - gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + gcsUtil.copyV2(srcPaths, dstPaths); assertExists(dstPaths.get(0)); assertExists(dstPaths.get(1)); // copy from existing files to an existing bucket // No exception on SAFE_OVERWRITE and UNSAFE_OVERWRITE - gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + gcsUtil.copyV2(srcPaths, dstPaths); gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.UNSAFE_OVERWRITE); // raise exception on NO_OVERWRITE @@ -345,11 +345,11 @@ private void testCopyAndRemoveV2(String existingBucket, List srcPaths) () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.NO_OVERWRITE)); // remove the existing files - gcsUtil.remove(dstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + gcsUtil.removeV2(dstPaths); // remove the already-deleted files // No exception on IGNORE_MISSING_TARGET - gcsUtil.remove(dstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + gcsUtil.removeV2(dstPaths); // raise exception on FAIL_ON_MISSING_TARGET assertThrows( @@ -362,12 +362,10 @@ private void testCopyAndRemoveV2(String existingBucket, List srcPaths) .collect(Collectors.toList()); // copy from existing files to an nonexistent bucket. Raise exception. - assertThrows( - FileNotFoundException.class, - () -> gcsUtil.copy(srcPaths, wrongDstPaths, OverwriteStrategy.SAFE_OVERWRITE)); + assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(srcPaths, wrongDstPaths)); // remove files from an nonexistent bucket. No exception. - gcsUtil.remove(wrongDstPaths, MissingStrategy.IGNORE_MISSING_TARGET); + gcsUtil.removeV2(wrongDstPaths); final List wrongSrcPaths = Arrays.asList(GcsPath.fromUri("gs://apache-beam-samples/shakespeare/some-random-name.txt")); From 8300a3f0fbba18b7a34e313a6f4792fb9c173038 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 12 Feb 2026 11:34:01 -0500 Subject: [PATCH 04/11] Refactor MissingStrategy and OverwriteStrategy enums. Add rewriteHelper() and move(). --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 12 ++- .../sdk/extensions/gcp/util/GcsUtilV2.java | 93 ++++++++++++++----- .../gcp/util/GcsUtilParameterizedIT.java | 16 ++-- 3 files changed, 90 insertions(+), 31 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 2d7035e1a507..171b8bbcba31 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -461,6 +461,16 @@ public void rename( delegate.rename(srcFilenames, destFilenames, moveOptions); } + public void renameV2( + Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) + throws IOException { + if (delegateV2 != null) { + delegateV2.move(srcPaths, dstPaths, moveOptions); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + @VisibleForTesting @SuppressWarnings("JdkObsolete") // for LinkedList java.util.LinkedList makeRewriteOps( @@ -494,7 +504,7 @@ public void remove(Collection filenames) throws IOException { } public void removeV2(Iterable paths) throws IOException { - remove(paths, MissingStrategy.IGNORE_MISSING_TARGET); + remove(paths, MissingStrategy.SKIP_IF_MISSING); } public void remove(Iterable paths, MissingStrategy strategy) throws IOException { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 3429ea93b98e..6e6841560dcf 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -46,14 +46,19 @@ import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; class GcsUtilV2 { @@ -274,8 +279,8 @@ public List expand(GcsPath gcsPattern) throws IOException { } public enum MissingStrategy { - IGNORE_MISSING_TARGET, - FAIL_ON_MISSING_TARGET, + FAIL_IF_MISSING, + SKIP_IF_MISSING, } public void remove(Iterable paths, MissingStrategy strategy) throws IOException { @@ -296,7 +301,7 @@ public void remove(Iterable paths, MissingStrategy strategy) throws IOE try { Boolean deleted = future.get(); if (!deleted) { - if (strategy == MissingStrategy.FAIL_ON_MISSING_TARGET) { + if (strategy == MissingStrategy.FAIL_IF_MISSING) { throw new FileNotFoundException( String.format( "The specified file does not exist: %s", pathPartition.get(i).toString())); @@ -312,13 +317,18 @@ public void remove(Iterable paths, MissingStrategy strategy) throws IOE } public enum OverwriteStrategy { - NO_OVERWRITE, // Fail if target exists + FAIL_IF_EXISTS, // Fail if target exists + SKIP_IF_EXISTS, // Skip if target exists SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic) - UNSAFE_OVERWRITE // Overwrite regardless of state + ALWAYS_OVERWRITE // Overwrite regardless of state } - public void copy( - Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) + private void rewriteHelper( + Iterable srcPaths, + Iterable dstPaths, + boolean deleteSrc, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) throws IOException { List srcList = Lists.newArrayList(srcPaths); List dstList = Lists.newArrayList(dstPaths); @@ -337,44 +347,83 @@ public void copy( CopyRequest.Builder copyRequestBuilder = CopyRequest.newBuilder() .setSource(srcId) - .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK); // 2GiB + .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK); - if (strategy == OverwriteStrategy.UNSAFE_OVERWRITE) { + if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) { copyRequestBuilder.setTarget(dstId); } else { - // Both NO_OVERWRITE and SAFE_OVERWRITE require checking the existing blob - BlobInfo existingTarget = null; + // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking the target blob + BlobInfo existingTarget; try { existingTarget = storage.get(dstId); } catch (StorageException e) { throw translateStorageException(dstPath, e); } - Storage.BlobTargetOption precondition; if (existingTarget == null) { - precondition = Storage.BlobTargetOption.doesNotExist(); - } else if (strategy == OverwriteStrategy.SAFE_OVERWRITE) { - // SAFE_OVERWRITE: match the generation of the existing object - precondition = Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()); + copyRequestBuilder.setTarget(dstId, Storage.BlobTargetOption.doesNotExist()); } else { - // Target exists and we aren't allowed to overwrite (strategy == NO_OVERWRITE) - throw new FileAlreadyExistsException( - srcPath.toString(), - dstPath.toString(), - "Target object already exists and CopyStrategy is NO_OVERWRITE"); + switch (dstOverwrite) { + case SKIP_IF_EXISTS: + LOG.warn("Ignoring rewriting from {} to {} because target exists.", srcPath, dstPath); + continue; // Skip to next file in for-loop + + case SAFE_OVERWRITE: + copyRequestBuilder.setTarget( + dstId, Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration())); + break; + + case FAIL_IF_EXISTS: + default: + throw new FileAlreadyExistsException( + srcPath.toString(), + dstPath.toString(), + "Target object already exists and strategy is FAIL_IF_EXISTS"); + } } - copyRequestBuilder.setTarget(dstId, precondition); } try { CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); copyWriter.getResult(); + + if (deleteSrc) { + storage.get(srcId).delete(); + } } catch (StorageException e) { throw translateStorageException(srcPath, e); } } } + public void copy( + Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) + throws IOException { + rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING, strategy); + } + + public void move( + Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) + throws IOException { + Set moveOptionSet = Sets.newHashSet(moveOptions); + final MissingStrategy srcMissing; + final OverwriteStrategy dstOverwrite; + + if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) { + srcMissing = MissingStrategy.SKIP_IF_MISSING; + } else { + srcMissing = MissingStrategy.FAIL_IF_MISSING; + } + + if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) { + dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS; + } else { + dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE; + } + + rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite); + } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { String bucketName = path.getBucket(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 1208ba027353..b09cf3bd426c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -335,26 +335,26 @@ private void testCopyAndRemoveV2(String existingBucket, List srcPaths) assertExists(dstPaths.get(1)); // copy from existing files to an existing bucket - // No exception on SAFE_OVERWRITE and UNSAFE_OVERWRITE + // No exception on SAFE_OVERWRITE and ALWAYS_OVERWRITE gcsUtil.copyV2(srcPaths, dstPaths); - gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.UNSAFE_OVERWRITE); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE); - // raise exception on NO_OVERWRITE + // raise exception on FAIL_IF_EXISTS assertThrows( FileAlreadyExistsException.class, - () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.NO_OVERWRITE)); + () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.FAIL_IF_EXISTS)); // remove the existing files gcsUtil.removeV2(dstPaths); // remove the already-deleted files - // No exception on IGNORE_MISSING_TARGET + // No exception on IGNORE_IF_MISSING gcsUtil.removeV2(dstPaths); - // raise exception on FAIL_ON_MISSING_TARGET + // raise exception on FAIL_IF_MISSING assertThrows( FileNotFoundException.class, - () -> gcsUtil.remove(dstPaths, MissingStrategy.FAIL_ON_MISSING_TARGET)); + () -> gcsUtil.remove(dstPaths, MissingStrategy.FAIL_IF_MISSING)); final List wrongDstPaths = srcPaths.stream() @@ -433,7 +433,7 @@ private void tearDownBucketHelper(String bucketName) { List paths = gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", bucketName))); if (experiment.equals("use_gcsutil_v2")) { - gcsUtil.remove(paths, MissingStrategy.IGNORE_MISSING_TARGET); + gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING); gcsUtil.removeBucket(BucketInfo.of(bucketName)); } else { gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList())); From f1ec2b2ba67ad614278bde91ff65cd33fd1604dd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 12 Feb 2026 13:22:12 -0500 Subject: [PATCH 05/11] Add rename tests and refacor copy and remove tests. --- .../sdk/extensions/gcp/util/GcsUtilV2.java | 5 + .../gcp/util/GcsUtilParameterizedIT.java | 349 ++++++++++++------ 2 files changed, 250 insertions(+), 104 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 6e6841560dcf..064636857911 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -391,6 +391,11 @@ private void rewriteHelper( storage.get(srcId).delete(); } } catch (StorageException e) { + if (e.getCode() == 404 && srcMissing == MissingStrategy.SKIP_IF_MISSING) { + LOG.warn( + "Ignoring rewriting from {} to {} because source does not exist.", srcPath, dstPath); + continue; + } throw translateStorageException(srcPath, e); } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index b09cf3bd426c..9c39314c3ef0 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -300,146 +301,278 @@ public void testCreateAndRemoveBucket() throws IOException { } } - @Test - public void testCopyAndRemove() throws IOException { - String existingBucket = "apache-beam-temp-bucket-12345"; - List srcPaths = + private List createTestBucketHelper(String bucketName) throws IOException { + final List originPaths = Arrays.asList( GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")); - // create a bucket first + final List testPaths = + originPaths.stream() + .map(o -> GcsPath.fromComponents(bucketName, o.getObject())) + .collect(Collectors.toList()); + + // create bucket and copy some initial files into there + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.createBucket(BucketInfo.of(bucketName)); + + gcsUtil.copyV2(originPaths, testPaths); + } else { + GcsOptions gcsOptions = options.as(GcsOptions.class); + gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); + + final List originList = + originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List testList = + testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + gcsUtil.copy(originList, testList); + } + + return testPaths; + } + + private void tearDownTestBucketHelper(String bucketName) { try { - createBucketHelper(existingBucket); + // use "**" in the pattern to match any characters including "/". + final List paths = + gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", bucketName))); if (experiment.equals("use_gcsutil_v2")) { - testCopyAndRemoveV2(existingBucket, srcPaths); + gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING); + gcsUtil.removeBucket(BucketInfo.of(bucketName)); } else { - testCopyAndRemoveV1(existingBucket, srcPaths); + gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList())); + gcsUtil.removeBucket(new Bucket().setName(bucketName)); } } catch (IOException e) { - throw e; - } finally { - tearDownBucketHelper(existingBucket); } } - private void testCopyAndRemoveV2(String existingBucket, List srcPaths) - throws IOException { - String nonExistentBucket = "my-random-test-bucket-12345"; - List dstPaths = - srcPaths.stream() - .map(o -> GcsPath.fromComponents(existingBucket, o.getObject())) - .collect(Collectors.toList()); - gcsUtil.copyV2(srcPaths, dstPaths); - assertExists(dstPaths.get(0)); - assertExists(dstPaths.get(1)); - - // copy from existing files to an existing bucket - // No exception on SAFE_OVERWRITE and ALWAYS_OVERWRITE - gcsUtil.copyV2(srcPaths, dstPaths); - gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE); - - // raise exception on FAIL_IF_EXISTS - assertThrows( - FileAlreadyExistsException.class, - () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.FAIL_IF_EXISTS)); - - // remove the existing files - gcsUtil.removeV2(dstPaths); - - // remove the already-deleted files - // No exception on IGNORE_IF_MISSING - gcsUtil.removeV2(dstPaths); - - // raise exception on FAIL_IF_MISSING - assertThrows( - FileNotFoundException.class, - () -> gcsUtil.remove(dstPaths, MissingStrategy.FAIL_IF_MISSING)); - - final List wrongDstPaths = - srcPaths.stream() - .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) - .collect(Collectors.toList()); + @Test + public void testCopy() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; - // copy from existing files to an nonexistent bucket. Raise exception. - assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(srcPaths, wrongDstPaths)); + try { + final List srcPaths = createTestBucketHelper(existingBucket); + final List dstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) + .collect(Collectors.toList()); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + assertNotExists(dstPaths.get(0)); + assertNotExists(dstPaths.get(1)); - // remove files from an nonexistent bucket. No exception. - gcsUtil.removeV2(wrongDstPaths); + if (experiment.equals("use_gcsutil_v2")) { + // (1) when the target files do not exist + gcsUtil.copyV2(srcPaths, dstPaths); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the target files exist + // (2a) no exception on SAFE_OVERWRITE, ALWAYS_OVERWRITE, SKIP_IF_EXISTS + gcsUtil.copyV2(srcPaths, dstPaths); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SKIP_IF_EXISTS); + + // (2b) raise exception on FAIL_IF_EXISTS + assertThrows( + FileAlreadyExistsException.class, + () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.FAIL_IF_EXISTS)); - final List wrongSrcPaths = - Arrays.asList(GcsPath.fromUri("gs://apache-beam-samples/shakespeare/some-random-name.txt")); + // (3) raise exception when the target bucket is nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(srcPaths, errPaths)); - // missing source file. Raise exception. - assertThrows( - FileNotFoundException.class, - () -> gcsUtil.copy(wrongSrcPaths, wrongSrcPaths, OverwriteStrategy.SAFE_OVERWRITE)); + // (4) raise exception when the source files are nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(errPaths, dstPaths)); + } else { + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List dstList = + dstPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // (1) when the target files do not exist + gcsUtil.copy(srcList, dstList); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the target files exist, no exception + gcsUtil.copy(srcList, dstList); + + // (3) raise exception when the target bucket is nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList, errList)); + + // (4) raise exception when the source files are nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList, dstList)); + } + } catch (IOException e) { + throw e; + } finally { + tearDownTestBucketHelper(existingBucket); + } } - private void testCopyAndRemoveV1(String existingBucket, List srcPaths) - throws IOException { - String nonExistentBucket = "my-random-test-bucket-12345"; - List srcList = srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + @Test + public void testRemove() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; - // copy from existing files to an existing bucket - List dstList = - srcPaths.stream() - .map(o -> String.format("gs://%s/%s", existingBucket, o.getObject())) - .collect(Collectors.toList()); - gcsUtil.copy(srcList, dstList); + try { + final List srcPaths = createTestBucketHelper(existingBucket); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); - assertExists(GcsPath.fromUri(dstList.get(0))); - assertExists(GcsPath.fromUri(dstList.get(1))); + assertExists(srcPaths.get(0)); + assertExists(srcPaths.get(1)); - // copy from existing files to an existing bucket, but target files exist. No exception. - gcsUtil.copy(srcList, dstList); + if (experiment.equals("use_gcsutil_v2")) { + // (1) when the files to remove exist + gcsUtil.removeV2(srcPaths); + assertNotExists(srcPaths.get(0)); + assertNotExists(srcPaths.get(1)); - // remove the existing files - gcsUtil.remove(dstList); + // (2) when the files to remove have been deleted + // (2a) no exception on SKIP_IF_MISSING + gcsUtil.removeV2(srcPaths); + gcsUtil.remove(srcPaths, MissingStrategy.SKIP_IF_MISSING); - // remove the already-deleted files. No exception. - gcsUtil.remove(dstList); + // (2b) raise exception on FAIL_IF_MISSING + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.remove(srcPaths, MissingStrategy.FAIL_IF_MISSING)); - final List wrongDstList = - srcPaths.stream() - .map(o -> String.format("gs://%s/%s", nonExistentBucket, o.getObject())) - .collect(Collectors.toList()); + // (3) when the files are from an nonexistent bucket + // (3a) no exception on SKIP_IF_MISSING + gcsUtil.removeV2(errPaths); + gcsUtil.remove(errPaths, MissingStrategy.SKIP_IF_MISSING); - // copy from existing files to an nonexistent bucket. Raise exception. - assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList, wrongDstList)); + // (3b) raise exception on FAIL_IF_MISSING + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.remove(errPaths, MissingStrategy.FAIL_IF_MISSING)); + } else { + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - // remove files from an nonexistent bucket. No exception. - gcsUtil.remove(dstList); + // (1) when the files to remove exist + gcsUtil.remove(srcList); + assertNotExists(srcPaths.get(0)); + assertNotExists(srcPaths.get(1)); - final List wrongSrcList = - Arrays.asList("gs://apache-beam-samples/shakespeare/some-random-name.txt"); + // (2) when the files to remove have been deleted, no exception + gcsUtil.remove(srcList); - // missing source file, Raise exception - assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(wrongSrcList, wrongSrcList)); - } - - private void createBucketHelper(String bucketName) throws IOException { - if (experiment.equals("use_gcsutil_v2")) { - gcsUtil.createBucket(BucketInfo.of(bucketName)); - } else { - GcsOptions gcsOptions = options.as(GcsOptions.class); - gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); + // (3) when the files are from an nonexistent bucket, no exception + gcsUtil.remove(errList); + } + } catch (IOException e) { + throw e; + } finally { + tearDownTestBucketHelper(existingBucket); } } - private void tearDownBucketHelper(String bucketName) { + @Test + public void testRename() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; + try { - // Use "**" in the pattern to match any characters including "/". - List paths = - gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", bucketName))); + final List srcPaths = createTestBucketHelper(existingBucket); + final List tmpPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + o.getObject())) + .collect(Collectors.toList()); + final List dstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) + .collect(Collectors.toList()); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + assertNotExists(dstPaths.get(0)); + assertNotExists(dstPaths.get(1)); if (experiment.equals("use_gcsutil_v2")) { - gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING); - gcsUtil.removeBucket(BucketInfo.of(bucketName)); + // Make a copy of sources + gcsUtil.copyV2(srcPaths, tmpPaths); + + // (1) when the source files exist and target files do not + gcsUtil.renameV2(tmpPaths, dstPaths); + assertNotExists(tmpPaths.get(0)); + assertNotExists(tmpPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the source files do not exist + // (2a) no exception if IGNORE_MISSING_FILES is set + gcsUtil.renameV2(errPaths, dstPaths, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + // (2b) raise exception if if IGNORE_MISSING_FILES is not set + assertThrows(FileNotFoundException.class, () -> gcsUtil.renameV2(errPaths, dstPaths)); + + // (3) when both source files and target files exist + gcsUtil.renameV2( + srcPaths, dstPaths, MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + gcsUtil.renameV2(srcPaths, dstPaths); } else { - gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList())); - gcsUtil.removeBucket(new Bucket().setName(bucketName)); + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List tmpList = + tmpPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List dstList = + dstPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // Make a copy of sources + gcsUtil.copy(srcList, tmpList); + + // (1) when the source files exist and target files do not + gcsUtil.rename(tmpList, dstList); + assertNotExists(tmpPaths.get(0)); + assertNotExists(tmpPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the source files do not exist + // (2a) no exception if IGNORE_MISSING_FILES is set + gcsUtil.rename(errList, dstList, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + // (2b) raise exception if if IGNORE_MISSING_FILES is not set + assertThrows(FileNotFoundException.class, () -> gcsUtil.rename(errList, dstList)); + + // (3) when both source files and target files exist + assertExists(srcPaths.get(0)); + assertExists(srcPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // There is a bug in V1 where SKIP_IF_DESTINATION_EXISTS is not honored. + gcsUtil.rename( + srcList, dstList, MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + + assertNotExists(srcPaths.get(0)); // BUG! The renaming is supposed to be skipped + assertNotExists(srcPaths.get(1)); // BUG! The renaming is supposed to be skipped + // assertExists(srcPaths.get(0)); + // assertExists(srcPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); } } catch (IOException e) { + throw e; + } finally { + tearDownTestBucketHelper(existingBucket); } } @@ -450,4 +583,12 @@ private void assertExists(GcsPath path) throws IOException { gcsUtil.getObject(path); } } + + private void assertNotExists(GcsPath path) throws IOException { + if (experiment.equals("use_gcsutil_v2")) { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(path)); + } else { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path)); + } + } } From 4be259fffef0a8ff3e5dd6a4639d6b4c1d6ce24e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 12 Feb 2026 15:44:41 -0500 Subject: [PATCH 06/11] Refactor rename --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 34 ++++++++++++++++++- .../sdk/extensions/gcp/util/GcsUtilV2.java | 26 +++----------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 171b8bbcba31..9ba32dd8fd5e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; @@ -46,10 +47,12 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; public class GcsUtil { @@ -464,8 +467,37 @@ public void rename( public void renameV2( Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) throws IOException { + Set moveOptionSet = Sets.newHashSet(moveOptions); + final MissingStrategy srcMissing; + final OverwriteStrategy dstOverwrite; + + if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) { + srcMissing = MissingStrategy.SKIP_IF_MISSING; + } else { + srcMissing = MissingStrategy.FAIL_IF_MISSING; + } + + if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) { + dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS; + } else { + dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE; + } + + if (delegateV2 != null) { + delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + + public void rename( + Iterable srcPaths, + Iterable dstPaths, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) + throws IOException { if (delegateV2 != null) { - delegateV2.move(srcPaths, dstPaths, moveOptions); + delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite); } else { throw new IOException("GcsUtil V2 not initialized."); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index 064636857911..ac568955cb9a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -46,19 +46,14 @@ import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; -import org.apache.beam.sdk.io.fs.MoveOptions; -import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; class GcsUtilV2 { @@ -408,24 +403,11 @@ public void copy( } public void move( - Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) + Iterable srcPaths, + Iterable dstPaths, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) throws IOException { - Set moveOptionSet = Sets.newHashSet(moveOptions); - final MissingStrategy srcMissing; - final OverwriteStrategy dstOverwrite; - - if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) { - srcMissing = MissingStrategy.SKIP_IF_MISSING; - } else { - srcMissing = MissingStrategy.FAIL_IF_MISSING; - } - - if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) { - dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS; - } else { - dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE; - } - rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite); } From 4adf421da89fd31bad343e39a213964a439d5f15 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 12 Feb 2026 15:47:10 -0500 Subject: [PATCH 07/11] Add experimental annotations to the new copy, remove and rename --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 9ba32dd8fd5e..d10a01de2aff 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -434,20 +434,17 @@ List makeGetBatches( return legacyBatch; } - /** - * @deprecated use {@link #copyV2(Iterable, Iterable)} or {@link #copy(Iterable, Iterable, - * OverwriteStrategy)}. - */ - @Deprecated public void copy(Iterable srcFilenames, Iterable destFilenames) throws IOException { delegate.copy(srcFilenames, destFilenames); } + /** experimental */ public void copyV2(Iterable srcPaths, Iterable dstPaths) throws IOException { copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); } + /** experimental */ public void copy( Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) throws IOException { @@ -464,6 +461,7 @@ public void rename( delegate.rename(srcFilenames, destFilenames, moveOptions); } + /** experimental */ public void renameV2( Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) throws IOException { @@ -490,6 +488,7 @@ public void renameV2( } } + /** experimental */ public void rename( Iterable srcPaths, Iterable dstPaths, @@ -529,16 +528,16 @@ List makeRemoveBatches(Collection filenames) return delegate.makeRemoveBatches(filenames); } - /** @deprecated use {@link #removeV2(Iterable)} or {@link #remove(Iterable, MissingStrategy)}. */ - @Deprecated public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } + /** experimental */ public void removeV2(Iterable paths) throws IOException { remove(paths, MissingStrategy.SKIP_IF_MISSING); } + /** experimental */ public void remove(Iterable paths, MissingStrategy strategy) throws IOException { if (delegateV2 != null) { delegateV2.remove(paths, strategy); From b4c7ed077cafd1858e7681fbc58d42eda0a1bfa8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Feb 2026 10:01:31 -0500 Subject: [PATCH 08/11] Remove unused import. --- .../java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index d10a01de2aff..c5ed70d6771e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -41,7 +41,6 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV1.StorageObjectOrIOException; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; From 5682df7e50fe601a4ad9695721094a1998a444ad Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Feb 2026 10:22:59 -0500 Subject: [PATCH 09/11] Fix style. --- .../apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index c5ed70d6771e..205fd960488f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -438,12 +438,12 @@ public void copy(Iterable srcFilenames, Iterable destFilenames) delegate.copy(srcFilenames, destFilenames); } - /** experimental */ + /** experimental api. */ public void copyV2(Iterable srcPaths, Iterable dstPaths) throws IOException { copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); } - /** experimental */ + /** experimental api. */ public void copy( Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) throws IOException { @@ -460,7 +460,7 @@ public void rename( delegate.rename(srcFilenames, destFilenames, moveOptions); } - /** experimental */ + /** experimental api. */ public void renameV2( Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) throws IOException { @@ -487,7 +487,7 @@ public void renameV2( } } - /** experimental */ + /** experimental api. */ public void rename( Iterable srcPaths, Iterable dstPaths, @@ -531,12 +531,12 @@ public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } - /** experimental */ + /** experimental api. */ public void removeV2(Iterable paths) throws IOException { remove(paths, MissingStrategy.SKIP_IF_MISSING); } - /** experimental */ + /** experimental api. */ public void remove(Iterable paths, MissingStrategy strategy) throws IOException { if (delegateV2 != null) { delegateV2.remove(paths, strategy); From 5251ef00a1361789ec3180c1ab82af03c0f004f7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Feb 2026 10:24:33 -0500 Subject: [PATCH 10/11] Trigger post commit java for the integration tests of GcsUtil. --- .github/trigger_files/beam_PostCommit_Java.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 1bd74515152c..855f48a055f8 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } \ No newline at end of file From b50a5bf84d06790b8c1744c985589bb56cbea596 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 18 Feb 2026 11:18:11 -0500 Subject: [PATCH 11/11] Revise according to reviews. --- .../apache/beam/sdk/extensions/gcp/util/GcsUtil.java | 6 +----- .../beam/sdk/extensions/gcp/util/GcsUtilV2.java | 11 +++++++++-- .../extensions/gcp/util/GcsUtilParameterizedIT.java | 8 ++------ 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 205fd960488f..33399ef87b63 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -480,11 +480,7 @@ public void renameV2( dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE; } - if (delegateV2 != null) { - delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite); - } else { - throw new IOException("GcsUtil V2 not initialized."); - } + rename(srcPaths, dstPaths, srcMissing, dstOverwrite); } /** experimental api. */ diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index ac568955cb9a..b00b7ce0d728 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -369,11 +369,12 @@ private void rewriteHelper( break; case FAIL_IF_EXISTS: - default: throw new FileAlreadyExistsException( srcPath.toString(), dstPath.toString(), "Target object already exists and strategy is FAIL_IF_EXISTS"); + default: + throw new IllegalStateException("Unknown OverwriteStrategy: " + dstOverwrite); } } } @@ -383,7 +384,13 @@ private void rewriteHelper( copyWriter.getResult(); if (deleteSrc) { - storage.get(srcId).delete(); + if (!storage.delete(srcId)) { + // This may happen if the source file is deleted by another process after copy. + LOG.warn( + "Source file {} could not be deleted after move to {}. It may not have existed.", + srcPath, + dstPath); + } } } catch (StorageException e) { if (e.getCode() == 404 && srcMissing == MissingStrategy.SKIP_IF_MISSING) { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 9c39314c3ef0..80ffd72924fa 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -344,6 +344,8 @@ private void tearDownTestBucketHelper(String bucketName) { gcsUtil.removeBucket(new Bucket().setName(bucketName)); } } catch (IOException e) { + System.err.println( + "Error during tear down of test bucket " + bucketName + ": " + e.getMessage()); } } @@ -410,8 +412,6 @@ public void testCopy() throws IOException { // (4) raise exception when the source files are nonexistent. assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList, dstList)); } - } catch (IOException e) { - throw e; } finally { tearDownTestBucketHelper(existingBucket); } @@ -474,8 +474,6 @@ public void testRemove() throws IOException { // (3) when the files are from an nonexistent bucket, no exception gcsUtil.remove(errList); } - } catch (IOException e) { - throw e; } finally { tearDownTestBucketHelper(existingBucket); } @@ -569,8 +567,6 @@ public void testRename() throws IOException { assertExists(dstPaths.get(0)); assertExists(dstPaths.get(1)); } - } catch (IOException e) { - throw e; } finally { tearDownTestBucketHelper(existingBucket); }