From 4f60c59a3a1830224537212aae4b380759031492 Mon Sep 17 00:00:00 2001 From: Eliaaazzz Date: Tue, 20 Jan 2026 18:14:12 +1100 Subject: [PATCH 1/4] Fix DoFnInvoker cache collision for generic types This fixes a bug where ByteBuddyDoFnInvokerFactory would return the same cached invoker for different generic instantiations of the same DoFn class. Changes: 1. Introduced InvokerCacheKey with TypeDescriptors to ensure unique cache entries. 2. Updated generateInvokerClass to append type-based hash suffix. 3. Added regression test (testCacheKeyCollisionProof). Fixes #37351 --- .../reflect/ByteBuddyDoFnInvokerFactory.java | 109 +++++++++++++++--- .../transforms/reflect/DoFnInvokersTest.java | 57 ++++++++- 2 files changed, 147 insertions(+), 19 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 3afd8aeb5e90..7825129d12c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -166,15 +166,67 @@ public DoFnInvoker invokerFor(DoFn> fnClass; + private final TypeDescriptor inputType; + private final TypeDescriptor outputType; + + InvokerCacheKey( + Class> fnClass, + TypeDescriptor inputType, + TypeDescriptor outputType) { + this.fnClass = fnClass; + this.inputType = inputType; + this.outputType = outputType; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof InvokerCacheKey)) { + return false; + } + InvokerCacheKey that = (InvokerCacheKey) o; + return fnClass.equals(that.fnClass) + && inputType.equals(that.inputType) + && outputType.equals(that.outputType); + } + + @Override + public int hashCode() { + int result = fnClass.hashCode(); + result = 31 * result + inputType.hashCode(); + result = 31 * result + outputType.hashCode(); + return result; + } + + @Override + public String toString() { + return String.format( + "InvokerCacheKey{fnClass=%s, inputType=%s, outputType=%s}", + fnClass.getName(), inputType, outputType); + } + } + + /** + * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class + * and its generic type parameters. Needed because generating an invoker class is expensive, and + * to avoid generating an excessive number of classes consuming PermGen memory. + * + *

The cache key includes generic type information to prevent collisions when the same DoFn + * class is used with different generic types (e.g., MyDoFn<String> vs + * MyDoFn<Integer>). * *

Note that special care must be taken to enumerate this object as concurrent hash maps are inputType = fn.getInputTypeDescriptor(); + if (inputType == null) { + inputType = (TypeDescriptor) TypeDescriptor.of(Object.class); + } + @SuppressWarnings("unchecked") + TypeDescriptor outputType = fn.getOutputTypeDescriptor(); + if (outputType == null) { + outputType = (TypeDescriptor) TypeDescriptor.of(Object.class); + } + try { @SuppressWarnings("unchecked") DoFnInvokerBase> invoker = (DoFnInvokerBase>) - getByteBuddyInvokerConstructor(signature).newInstance(fn); + getByteBuddyInvokerConstructor(signature, inputType, outputType).newInstance(fn); if (signature.onTimerMethods() != null) { for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { @@ -297,19 +362,24 @@ public DoFnInvoker newByteBuddyInvoker( } /** - * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class. + * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFnSignature} + * and specific generic types. * *

These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link - * DoFn} class. + * DoFn} class with specific generic type parameters. Different generic instantiations of the same + * DoFn class will have separate cached invoker classes. */ - private Constructor getByteBuddyInvokerConstructor(DoFnSignature signature) { + private Constructor getByteBuddyInvokerConstructor( + DoFnSignature signature, TypeDescriptor inputType, TypeDescriptor outputType) { Class> fnClass = signature.fnClass(); + InvokerCacheKey cacheKey = new InvokerCacheKey(fnClass, inputType, outputType); return byteBuddyInvokerConstructorCache.computeIfAbsent( - fnClass, - clazz -> { - Class> invokerClass = generateInvokerClass(signature); + cacheKey, + key -> { + Class> invokerClass = + generateInvokerClass(signature, inputType, outputType); try { - return invokerClass.getConstructor(clazz); + return invokerClass.getConstructor(fnClass); } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { throw new RuntimeException(e); } @@ -457,18 +527,25 @@ public static double validateSize(double size) { } /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ - private static Class> generateInvokerClass(DoFnSignature signature) { + private static Class> generateInvokerClass( + DoFnSignature signature, TypeDescriptor inputType, TypeDescriptor outputType) { Class> fnClass = signature.fnClass(); + // Create a unique suffix based on the type descriptors to avoid class name collisions + // when the same DoFn class is used with different generic types. + String typeSuffix = + String.format( + "%s$%08x", + DoFnInvoker.class.getSimpleName(), + (inputType.toString() + "|" + outputType.toString()).hashCode()); + final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); DynamicType.Builder builder = new ByteBuddy() // Create subclasses inside the target class, to have access to // private and package-private bits - .with( - StableInvokerNamingStrategy.forDoFnClass(fnClass) - .withSuffix(DoFnInvoker.class.getSimpleName())) + .with(StableInvokerNamingStrategy.forDoFnClass(fnClass).withSuffix(typeSuffix)) // class extends DoFnInvokerBase { .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 299c5d5c5906..679c26cf7e9f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; @@ -77,6 +78,8 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.OutputBuilder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.WindowedValues; import org.joda.time.Instant; import org.junit.Before; @@ -1382,11 +1385,18 @@ public void process() {} @Test public void testStableName() { DoFnInvoker invoker = DoFnInvokers.invokerFor(new StableNameTestDoFn()); + // The invoker class name includes a hash of the type descriptors to support + // different generic instantiations of the same DoFn class. + // Format: $$ + TypeDescriptor voidType = new StableNameTestDoFn().getInputTypeDescriptor(); + String expectedTypeSuffix = + String.format( + "%s$%08x", + DoFnInvoker.class.getSimpleName(), + (voidType.toString() + "|" + voidType.toString()).hashCode()); assertThat( invoker.getClass().getName(), - equalTo( - String.format( - "%s$%s", StableNameTestDoFn.class.getName(), DoFnInvoker.class.getSimpleName()))); + equalTo(String.format("%s$%s", StableNameTestDoFn.class.getName(), expectedTypeSuffix))); } @Test @@ -1406,4 +1416,45 @@ public void processElement(BundleFinalizer bundleFinalizer) { verify(mockBundleFinalizer).afterBundleCommit(eq(Instant.ofEpochSecond(42L)), eq(null)); } + + @Test + public void testCacheKeyCollisionProof() throws Exception { + class DynamicTypeDoFn extends DoFn { + private final TypeDescriptor typeDescriptor; + + DynamicTypeDoFn(TypeDescriptor typeDescriptor) { + this.typeDescriptor = typeDescriptor; + } + + @ProcessElement + public void processElement(@Element T element, OutputReceiver out) { + out.output(element); + } + + // Key point: force returning our specified type instead of relying on class signature + @Override + public TypeDescriptor getInputTypeDescriptor() { + return typeDescriptor; + } + + @Override + public TypeDescriptor getOutputTypeDescriptor() { + return typeDescriptor; + } + } + + DoFn stringFn = new DynamicTypeDoFn<>(TypeDescriptors.strings()); + DoFn intFn = new DynamicTypeDoFn<>(TypeDescriptors.integers()); + + DoFnInvoker stringInvoker = DoFnInvokers.invokerFor(stringFn); + DoFnInvoker intInvoker = DoFnInvokers.invokerFor(intFn); + + System.out.println("String Invoker: " + stringInvoker.getClass().getName()); + System.out.println("Integer Invoker: " + intInvoker.getClass().getName()); + + assertNotSame( + "Critical bug: Beam returned the same cached class for different generic types.", + stringInvoker.getClass(), + intInvoker.getClass()); + } } From f0271f0e43890c3ee140324be5c4711544f5fa31 Mon Sep 17 00:00:00 2001 From: Eliaaazzz Date: Tue, 20 Jan 2026 19:51:54 +1100 Subject: [PATCH 2/4] [Java] Fix DoFnInvoker cache collision for generic types (with fallback) This PR fixes a critical issue where ByteBuddyDoFnInvokerFactory failed to distinguish between different generic instantiations of the same DoFn class (e.g., MyFn vs MyFn). 1. Cache Key Strategy: Introduced InvokerCacheKey to include input/output TypeDescriptors in the cache lookup. 2. Class Naming: Updated generateInvokerClass to append a type-based hash suffix to ensure unique class names. 3. Robustness (The Fix): Added defensive try-catch blocks when accessing TypeDescriptors. - Some internal transforms (like MapElements) throw IllegalStateException if getOutputTypeDescriptor() is called after serialization. - In these cases, the factory now gracefully falls back to using Object.class (legacy behavior), ensuring backward compatibility for transforms that do not retain type information at runtime. Fixes #37351 --- .../reflect/ByteBuddyDoFnInvokerFactory.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 7825129d12c3..98a041d8a7f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -318,14 +318,28 @@ public DoFnInvoker newByteBuddyInvoker( fn.getClass()); // Extract input and output type descriptors from the DoFn instance - // Fall back to Object.class if the type descriptors are null (e.g., for mocked DoFn instances) - @SuppressWarnings("unchecked") - TypeDescriptor inputType = fn.getInputTypeDescriptor(); + // Fall back to Object.class if the type descriptors are null or unavailable (e.g., MapElements + // after serialization) + TypeDescriptor inputType; + try { + inputType = fn.getInputTypeDescriptor(); + } catch (Exception e) { + // Some DoFns (like MapElements) throw IllegalStateException if queried after + // serialization. + // In this case, we fall back to the raw class behavior (Object). + inputType = null; + } if (inputType == null) { inputType = (TypeDescriptor) TypeDescriptor.of(Object.class); } - @SuppressWarnings("unchecked") - TypeDescriptor outputType = fn.getOutputTypeDescriptor(); + + TypeDescriptor outputType; + try { + outputType = fn.getOutputTypeDescriptor(); + } catch (Exception e) { + // Same as above: fall back to Object if type info is unavailable. + outputType = null; + } if (outputType == null) { outputType = (TypeDescriptor) TypeDescriptor.of(Object.class); } From 648ab3ca12d9ad26be65162bbee6b9c388ee5909 Mon Sep 17 00:00:00 2001 From: Eliaaazzz Date: Sat, 24 Jan 2026 13:01:59 +1100 Subject: [PATCH 3/4] Address review comments: Refactor ByteBuddyDoFnInvokerFactory methods - Replace String.format in toString() with MoreObjects.toStringHelper for consistency. - Update hashCode() and equals() to use java.util.Objects utility methods. - Extract type suffix generation logic into a reusable static method to avoid duplication in tests. --- .../reflect/ByteBuddyDoFnInvokerFactory.java | 54 ++++++++++++------- .../transforms/reflect/DoFnInvokersTest.java | 6 +-- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 98a041d8a7f2..6a2b7fe5f1fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import net.bytebuddy.ByteBuddy; import net.bytebuddy.description.field.FieldDescription; @@ -106,6 +107,7 @@ import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -192,24 +194,23 @@ public boolean equals(@Nullable Object o) { return false; } InvokerCacheKey that = (InvokerCacheKey) o; - return fnClass.equals(that.fnClass) - && inputType.equals(that.inputType) - && outputType.equals(that.outputType); + return Objects.equals(fnClass, that.fnClass) + && Objects.equals(inputType, that.inputType) + && Objects.equals(outputType, that.outputType); } @Override public int hashCode() { - int result = fnClass.hashCode(); - result = 31 * result + inputType.hashCode(); - result = 31 * result + outputType.hashCode(); - return result; + return Objects.hash(fnClass, inputType, outputType); } @Override public String toString() { - return String.format( - "InvokerCacheKey{fnClass=%s, inputType=%s, outputType=%s}", - fnClass.getName(), inputType, outputType); + return MoreObjects.toStringHelper(this) + .add("fnClass", fnClass.getName()) + .add("inputType", inputType) + .add("outputType", outputType) + .toString(); } } @@ -317,9 +318,10 @@ public DoFnInvoker newByteBuddyInvoker( signature.fnClass(), fn.getClass()); - // Extract input and output type descriptors from the DoFn instance - // Fall back to Object.class if the type descriptors are null or unavailable (e.g., MapElements - // after serialization) + // Extract input and output type descriptors to distinguish generic instantiations. + // Fall back to Object.class if unavailable. When type info is lost, different generic + // instantiations share an invoker, which is acceptable since the DoFn class in the cache + // key prevents collisions between different DoFn classes. TypeDescriptor inputType; try { inputType = fn.getInputTypeDescriptor(); @@ -540,6 +542,26 @@ public static double validateSize(double size) { } } + /** + * Generates a type suffix string for use in invoker class names. + * + *

This creates a unique suffix based on the input and output type descriptors to avoid class + * name collisions when the same DoFn class is used with different generic types. + * + *

The format is: {@code DoFnInvoker$<8-digit hex hash>} + * + * @param inputType the input type descriptor + * @param outputType the output type descriptor + * @return a string suffix for the invoker class name + */ + public static String generateTypeSuffix( + TypeDescriptor inputType, TypeDescriptor outputType) { + return String.format( + "%s$%08x", + DoFnInvoker.class.getSimpleName(), + (inputType.toString() + "|" + outputType.toString()).hashCode()); + } + /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ private static Class> generateInvokerClass( DoFnSignature signature, TypeDescriptor inputType, TypeDescriptor outputType) { @@ -547,11 +569,7 @@ public static double validateSize(double size) { // Create a unique suffix based on the type descriptors to avoid class name collisions // when the same DoFn class is used with different generic types. - String typeSuffix = - String.format( - "%s$%08x", - DoFnInvoker.class.getSimpleName(), - (inputType.toString() + "|" + outputType.toString()).hashCode()); + String typeSuffix = generateTypeSuffix(inputType, outputType); final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 679c26cf7e9f..186d58e33189 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -1389,11 +1389,7 @@ public void testStableName() { // different generic instantiations of the same DoFn class. // Format: $$ TypeDescriptor voidType = new StableNameTestDoFn().getInputTypeDescriptor(); - String expectedTypeSuffix = - String.format( - "%s$%08x", - DoFnInvoker.class.getSimpleName(), - (voidType.toString() + "|" + voidType.toString()).hashCode()); + String expectedTypeSuffix = ByteBuddyDoFnInvokerFactory.generateTypeSuffix(voidType, voidType); assertThat( invoker.getClass().getName(), equalTo(String.format("%s$%s", StableNameTestDoFn.class.getName(), expectedTypeSuffix))); From 10703d1dcf8fbcd6f386abc12b2318f9b4dfcd87 Mon Sep 17 00:00:00 2001 From: Elia LIU Date: Wed, 18 Feb 2026 00:50:50 +0000 Subject: [PATCH 4/4] Trigger extensive testing: PostCommit Java/Python & ValidatesRunner --- .github/trigger_files/beam_PostCommit_Java.json | 2 +- .../beam_PostCommit_Java_ValidatesRunner_Dataflow.json | 1 + .../beam_PostCommit_Java_ValidatesRunner_Direct.json | 1 + .../beam_PostCommit_Java_ValidatesRunner_Flink.json | 1 + .github/trigger_files/beam_PostCommit_Python.json | 2 +- 5 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 1bd74515152c..756b765e59e3 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 4 -} \ No newline at end of file +} diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 39523ea7c0fb..a89f7adb4ce8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -2,3 +2,4 @@ "comment": "Modify this file in a trivial way to cause this test suite to run!", "modification": 3, } + diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json index 7e7462c0b059..31caa31981ea 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json @@ -6,3 +6,4 @@ "https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test", "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } + diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json index afda4087adf8..55a372459000 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json @@ -7,3 +7,4 @@ "runFor": "#33606", "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } + diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index e43868bf4f24..82755946a988 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -2,4 +2,4 @@ "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "36271", "modification": 37 -} \ No newline at end of file +}