From 3a03e1515c0a87a84e5d1a36963d6110c75a1da3 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 25 Feb 2026 13:33:11 -0800 Subject: [PATCH 1/2] Fix protobuf v4 compatibility for schema APIs and functions --- .github/workflows/pulsar-ci.yaml | 102 +++++++++++++++++- .../org/apache/pulsar/client/api/Schema.java | 8 +- .../PulsarClientImplementationBinding.java | 4 +- ...PulsarClientImplementationBindingImpl.java | 4 +- .../impl/schema/ProtobufNativeSchema.java | 15 ++- .../client/impl/schema/ProtobufSchema.java | 17 ++- .../schema/reader/ProtobufNativeReader.java | 4 +- .../impl/schema/reader/ProtobufReader.java | 3 +- .../schema/writer/ProtobufNativeWriter.java | 4 +- .../impl/schema/writer/ProtobufWriter.java | 3 +- .../api/ProtobufSchemaApiSignatureTest.java | 58 ++++++++++ .../impl/schema/ProtobufNativeSchemaTest.java | 15 +++ .../impl/schema/ProtobufSchemaTest.java | 13 +++ .../instance/JavaInstanceRunnable.java | 2 +- .../pulsar/functions/source/TopicSchema.java | 2 +- .../instance/JavaInstanceRunnableTest.java | 10 ++ .../functions/source/TopicSchemaTest.java | 16 +++ 17 files changed, 248 insertions(+), 32 deletions(-) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 17947a45b5b19..a8603e012ffdf 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -536,6 +536,104 @@ jobs: with: action: wait + protobuf-v4-tests: + name: CI - Protobuf v4 - Client + Functions + runs-on: ubuntu-24.04 + timeout-minutes: 60 + needs: ['preconditions', 'build-and-license-check'] + if: ${{ needs.preconditions.outputs.docs_only != 'true' }} + env: + JOB_NAME: CI - Protobuf v4 - Client + Functions + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version }} + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Tune Runner VM + uses: ./.github/actions/tune-runner-vm + + - name: Setup ssh access to build runner VM + # ssh access is enabled for builds in own forks + if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} + uses: ./.github/actions/ssh-access + continue-on-error: true + with: + limit-access-to-actor: true + + - name: Cache Maven dependencies + uses: actions/cache@v4 + timeout-minutes: 5 + with: + path: | + ~/.m2/repository/*/*/* + !~/.m2/repository/org/apache/pulsar + key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-m2-dependencies-core-modules- + + - name: Set up JDK ${{ env.CI_JDK_MAJOR_VERSION }} + uses: actions/setup-java@v4 + with: + distribution: ${{ env.JDK_DISTRIBUTION }} + java-version: ${{ env.CI_JDK_MAJOR_VERSION }} + + - name: Install gh-actions-artifact-client.js + uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master + + - name: Restore maven build results from Github artifact cache + run: | + cd $HOME + $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries + $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh snapshot_pulsar_maven_artifacts + + - name: Run protobuf v4 client and functions compatibility tests + run: | + mvn -B -ntp \ + -DskipSourceReleaseAssembly=true \ + -DskipBuildDistribution=true \ + -Dspotbugs.skip=true \ + -Dlicense.skip=true \ + -Dcheckstyle.skip=true \ + -Drat.skip=true \ + -Dprotobuf3.version=4.31.1 \ + -Dprotoc3.version=4.31.1 \ + -pl pulsar-client,pulsar-functions/instance \ + -Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest \ + -Dsurefire.failIfNoSpecifiedTests=false \ + test + + - name: print JVM thread dumps when cancelled + if: cancelled() + run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh print_thread_dumps + + - name: Aggregates all test reports to ./test-reports and ./surefire-reports directories + if: ${{ always() }} + uses: ./.github/actions/copy-test-reports + + - name: Publish Test Report + uses: apache/pulsar-test-infra/action-junit-report@master + if: ${{ always() }} + with: + report_paths: 'test-reports/TEST-*.xml' + annotate_only: 'true' + + - name: Upload Surefire reports + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: Unit-PROTOBUF_V4_CLIENT_FUNCTIONS-surefire-reports + path: surefire-reports + retention-days: 7 + + - name: Wait for ssh connection when build fails + # ssh access is enabled for builds in own forks + uses: ./.github/actions/ssh-access + if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} + continue-on-error: true + with: + action: wait + pulsar-java-test-image: name: Build Pulsar java-test-image ${{ matrix.base.type }} docker image for ${{ matrix.platform }} runs-on: ubuntu-24.04 @@ -1680,6 +1778,7 @@ jobs: needs: [ 'preconditions', 'unit-tests', + 'protobuf-v4-tests', 'integration-tests', 'system-tests', 'flaky-system-tests', @@ -1697,6 +1796,7 @@ jobs: if [[ ! ( \ "${{ needs.preconditions.result }}" == "success" \ && "${{ needs.unit-tests.result }}" == "success" \ + && "${{ needs.protobuf-v4-tests.result }}" == "success" \ && "${{ needs.integration-tests.result }}" == "success" \ && "${{ needs.system-tests.result }}" == "success" \ && "${{ needs.macos-build.result }}" == "success" \ @@ -1722,4 +1822,4 @@ jobs: if: ${{ needs.preconditions.outputs.docs_only != 'true' && !contains(needs.*.result, 'failure') && !contains(needs.*.result, 'cancelled') }} run: | gh-actions-artifact-client.js delete pulsar-maven-repository-binaries.tar.zst || true - gh-actions-artifact-client.js delete pulsar-server-distribution.tar.zst || true \ No newline at end of file + gh-actions-artifact-client.js delete pulsar-server-distribution.tar.zst || true diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 3089684a1f9d4..64862edca1e95 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -300,7 +300,7 @@ default Optional getNativeSchema() { * @param clazz the Protobuf generated class to be used to extract the schema * @return a Schema instance */ - static Schema PROTOBUF(Class clazz) { + static Schema PROTOBUF(Class clazz) { return DefaultImplementation.getDefaultImplementation() .newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build()); } @@ -311,7 +311,7 @@ static Schema PROTOBUF(Cla * @param schemaDefinition schemaDefinition the definition of the schema * @return a Schema instance */ - static Schema PROTOBUF(SchemaDefinition schemaDefinition) { + static Schema PROTOBUF(SchemaDefinition schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition); } @@ -321,7 +321,7 @@ static Schema PROTOBUF(Sch * @param clazz the Protobuf generated class to be used to extract the schema * @return a Schema instance */ - static Schema PROTOBUF_NATIVE(Class clazz) { + static Schema PROTOBUF_NATIVE(Class clazz) { return DefaultImplementation.getDefaultImplementation() .newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build()); } @@ -332,7 +332,7 @@ static Schema PROTOBUF_NAT * @param schemaDefinition schemaDefinition the definition of the schema * @return a Schema instance */ - static Schema PROTOBUF_NATIVE( + static Schema PROTOBUF_NATIVE( SchemaDefinition schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index b5f2a3a468e81..f13cc003f5be7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -116,9 +116,9 @@ Authentication createAuthentication(String authPluginClassName, Map Schema newAvroSchema(SchemaDefinition schemaDefinition); - Schema newProtobufSchema(SchemaDefinition schemaDefinition); + Schema newProtobufSchema(SchemaDefinition schemaDefinition); - Schema newProtobufNativeSchema( + Schema newProtobufNativeSchema( SchemaDefinition schemaDefinition); Schema newJSONSchema(SchemaDefinition schemaDefinition); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 0351477985f7e..0b61540821a7c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -210,12 +210,12 @@ public Schema newAvroSchema(SchemaDefinition schemaDefinition) { return AvroSchema.of(schemaDefinition); } - public Schema newProtobufSchema( + public Schema newProtobufSchema( SchemaDefinition schemaDefinition) { return ProtobufSchema.of(schemaDefinition); } - public Schema newProtobufNativeSchema( + public Schema newProtobufNativeSchema( SchemaDefinition schemaDefinition) { return ProtobufNativeSchema.of(schemaDefinition); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java index 46a2f7d806a61..9243303e84288 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.Descriptors; -import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.Message; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; @@ -41,7 +41,7 @@ /** * A schema implementation to deal with protobuf generated messages. */ -public class ProtobufNativeSchema extends AbstractStructSchema { +public class ProtobufNativeSchema extends AbstractStructSchema { public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__"; @@ -104,7 +104,7 @@ public Optional getNativeSchema() { return Optional.of(getProtobufNativeSchema()); } - public static ProtobufNativeSchema of(Class pojo) { + public static ProtobufNativeSchema of(Class pojo) { return of(pojo, new HashMap<>()); } @@ -117,8 +117,8 @@ public static ProtobufNativeSchema ofGenericClass(Class pojo, Map ProtobufNativeSchema of(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo(); - if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) { - throw new IllegalArgumentException(GeneratedMessageV3.class.getName() + if (!Message.class.isAssignableFrom(pojo)) { + throw new IllegalArgumentException(Message.class.getName() + " is not assignable from " + pojo.getName()); } Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo()); @@ -131,14 +131,13 @@ public static ProtobufNativeSchema of(SchemaDefinition schemaDefinition) .build(); try { return new ProtobufNativeSchema(schemaInfo, - (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null)); + (Message) pojo.getMethod("getDefaultInstance").invoke(null)); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } } - public static ProtobufNativeSchema of( - Class pojo, Map properties) { + public static ProtobufNativeSchema of(Class pojo, Map properties) { return ofGenericClass(pojo, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java index 2e33bcda299ad..71e0bed69120e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java @@ -21,7 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.Descriptors; -import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.Message; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.LinkedList; @@ -41,7 +41,7 @@ /** * A schema implementation to deal with protobuf generated messages. */ -public class ProtobufSchema extends AvroBaseStructSchema { +public class ProtobufSchema extends AvroBaseStructSchema { public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__"; @@ -53,7 +53,7 @@ public static class ProtoBufParsingInfo { private final String type; private final String label; // For future nested fields - private final Map definition; + private final Map definition; } private static org.apache.avro.Schema createProtobufAvroSchema(Class pojo) { @@ -89,7 +89,7 @@ public void accept(Descriptors.FieldDescriptor fieldDescriptor) { } } - public static ProtobufSchema of(Class pojo) { + public static ProtobufSchema of(Class pojo) { return of(pojo, new HashMap<>()); } @@ -102,8 +102,8 @@ public static ProtobufSchema ofGenericClass(Class pojo, Map ProtobufSchema of(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo(); - if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) { - throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName() + if (!Message.class.isAssignableFrom(pojo)) { + throw new IllegalArgumentException(Message.class.getName() + " is not assignable from " + pojo.getName()); } @@ -116,14 +116,13 @@ public static ProtobufSchema of(SchemaDefinition schemaDefinition) { try { return new ProtobufSchema(schemaInfo, - (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null)); + (Message) pojo.getMethod("getDefaultInstance").invoke(null)); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } } - public static ProtobufSchema of( - Class pojo, Map properties){ + public static ProtobufSchema of(Class pojo, Map properties) { return ofGenericClass(pojo, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java index 1c10608d448ea..0ecc0be3d8cb3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.schema.reader; -public class ProtobufNativeReader extends ProtobufReader { +import com.google.protobuf.Message; + +public class ProtobufNativeReader extends ProtobufReader { public ProtobufNativeReader(T protoMessageInstance) { super(protoMessageInstance); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java index a2504a660a041..a56fb30b1406b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl.schema.reader; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.google.protobuf.Parser; import java.io.IOException; import java.io.InputStream; @@ -27,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProtobufReader implements SchemaReader { +public class ProtobufReader implements SchemaReader { private Parser tParser; public ProtobufReader(T protoMessageInstance) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java index 32569f7b7c05f..0abd231475fbc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.schema.writer; -public class ProtobufNativeWriter extends ProtobufWriter { +import com.google.protobuf.Message; + +public class ProtobufNativeWriter extends ProtobufWriter { public ProtobufNativeWriter() { super(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java index 52ccec8dfaaca..7bab227e18156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.client.impl.schema.writer; +import com.google.protobuf.Message; import org.apache.pulsar.client.api.schema.SchemaWriter; -public class ProtobufWriter implements SchemaWriter { +public class ProtobufWriter implements SchemaWriter { @Override public byte[] write(T message) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java new file mode 100644 index 0000000000000..f9ce4ecbf9746 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.google.protobuf.Message; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.internal.PulsarClientImplementationBinding; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ProtobufSchemaApiSignatureTest { + + @Test + public void testSchemaProtobufTypeBounds() throws NoSuchMethodException { + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", Class.class), Message.class); + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", SchemaDefinition.class), Message.class); + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF_NATIVE", Class.class), Message.class); + assertSingleTypeParamUpperBound( + Schema.class.getMethod("PROTOBUF_NATIVE", SchemaDefinition.class), Message.class); + } + + @Test + public void testBindingProtobufTypeBounds() throws NoSuchMethodException { + assertSingleTypeParamUpperBound( + PulsarClientImplementationBinding.class.getMethod("newProtobufSchema", SchemaDefinition.class), + Message.class); + assertSingleTypeParamUpperBound( + PulsarClientImplementationBinding.class.getMethod("newProtobufNativeSchema", SchemaDefinition.class), + Message.class); + } + + private static void assertSingleTypeParamUpperBound(Method method, Type expectedBound) { + TypeVariable[] typeParameters = method.getTypeParameters(); + Assert.assertEquals(typeParameters.length, 1, method + " should define one type parameter"); + Type[] bounds = typeParameters[0].getBounds(); + Assert.assertEquals(bounds.length, 1, method + " should define one type bound"); + Assert.assertEquals(bounds[0], expectedBound, method + " has unexpected type bound"); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java index a0ea3c1cc6e95..ed918cd0b8cd3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.StringValue; import com.google.protobuf.util.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -93,6 +95,19 @@ public void testEncodeAndDecode() { assertEquals(message.getStringField(), stringFieldValue); } + @Test + public void testSchemaApiSupportsMessageBound() { + Any any = Any.pack(StringValue.newBuilder().setValue("native-message").build()); + org.apache.pulsar.client.api.Schema protobufSchema = + org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE(Any.class); + + byte[] bytes = protobufSchema.encode(any); + Any message = protobufSchema.decode(bytes); + + assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE); + assertEquals(message, any); + } + @Test public void testSchema() throws Exception { ProtobufNativeSchema protobufSchema = diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java index 5acdd5b1b1cb6..9e283b6799dfd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.protobuf.Any; +import com.google.protobuf.StringValue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; @@ -121,6 +123,17 @@ public void testEncodeAndDecode() { Assert.assertEquals(message.getName(), NAME); } + @Test + public void testSchemaApiSupportsMessageBound() { + Any any = Any.pack(StringValue.newBuilder().setValue(NAME).build()); + org.apache.pulsar.client.api.Schema protobufSchema = + org.apache.pulsar.client.api.Schema.PROTOBUF(Any.class); + + byte[] bytes = protobufSchema.encode(any); + Any message = protobufSchema.decode(bytes); + Assert.assertEquals(message, any); + } + @Test public void testSchema() { ProtobufSchema protobufSchema = diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 50cb1fb54e82f..56dc802affb4b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -1182,7 +1182,7 @@ private static SchemaType getDefaultSchemaType(Class clazz) { private static boolean isProtobufClass(Class pojoClazz) { try { - Class protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3"); + Class protobufBaseClass = Class.forName("com.google.protobuf.Message"); return protobufBaseClass.isAssignableFrom(pojoClazz); } catch (ClassNotFoundException | NoClassDefFoundError e) { // If sink does not have protobuf in classpath then it cannot be protobuf diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index d8ae6b19f4a7e..81f4de4968288 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -196,7 +196,7 @@ private static Schema newSchemaInstance(Class clazz, SchemaType type, private static boolean isProtobufClass(Class pojoClazz) { try { - Class protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3"); + Class protobufBaseClass = Class.forName("com.google.protobuf.Message"); return protobufBaseClass.isAssignableFrom(pojoClazz); } catch (ClassNotFoundException | NoClassDefFoundError e) { // If function does not have protobuf in classpath then it cannot be protobuf diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 7cae03c8f5f7b..eea1c9cc96628 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.protobuf.Any; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; @@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -245,6 +247,14 @@ public void testStatsManagerNull() throws Exception { Assert.assertEquals(javaInstanceRunnable.getMetrics(), InstanceCommunication.MetricsData.newBuilder().build()); } + @Test + public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() throws Exception { + Method method = JavaInstanceRunnable.class.getDeclaredMethod("getDefaultSchemaType", Class.class); + method.setAccessible(true); + SchemaType schemaType = (SchemaType) method.invoke(null, Any.class); + Assert.assertEquals(schemaType, SchemaType.PROTOBUF); + } + @Test public void testSinkConfigParsingPreservesOriginalType() throws Exception { final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java index fa6d1a533dc88..6b065fc237a94 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java @@ -18,10 +18,16 @@ */ package org.apache.pulsar.functions.source; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import com.google.protobuf.Any; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; @@ -53,6 +59,16 @@ public void testGetSchema() { assertEquals(schema.getClass(), ProtobufNativeSchema.class); } + @Test + public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + TopicSchema topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader()); + Schema schema = topicSchema.getSchema("public/default/test-protobuf-default", Any.class, Optional.empty()); + assertEquals(schema.getClass(), ProtobufSchema.class); + } + private static class DummyClass { } } From d04cbd45029af527d7abb37ac5478de7a91c22b0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 26 Feb 2026 12:04:53 +0200 Subject: [PATCH 2/2] Use run_unit_group.sh for test commands so that unit-tests matrix can be used --- .github/workflows/pulsar-ci.yaml | 104 +------------------------------ build/run_unit_group.sh | 8 +++ 2 files changed, 11 insertions(+), 101 deletions(-) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index a8603e012ffdf..9d7e025d71d06 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -337,6 +337,8 @@ jobs: group: CLIENT - name: Pulsar Metadata group: METADATA + - name: Protobuf v4 + group: PROTOBUFV4 steps: - name: checkout @@ -536,104 +538,6 @@ jobs: with: action: wait - protobuf-v4-tests: - name: CI - Protobuf v4 - Client + Functions - runs-on: ubuntu-24.04 - timeout-minutes: 60 - needs: ['preconditions', 'build-and-license-check'] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - JOB_NAME: CI - Protobuf v4 - Client + Functions - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version }} - steps: - - name: checkout - uses: actions/checkout@v4 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache Maven dependencies - uses: actions/cache@v4 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK ${{ env.CI_JDK_MAJOR_VERSION }} - uses: actions/setup-java@v4 - with: - distribution: ${{ env.JDK_DISTRIBUTION }} - java-version: ${{ env.CI_JDK_MAJOR_VERSION }} - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh snapshot_pulsar_maven_artifacts - - - name: Run protobuf v4 client and functions compatibility tests - run: | - mvn -B -ntp \ - -DskipSourceReleaseAssembly=true \ - -DskipBuildDistribution=true \ - -Dspotbugs.skip=true \ - -Dlicense.skip=true \ - -Dcheckstyle.skip=true \ - -Drat.skip=true \ - -Dprotobuf3.version=4.31.1 \ - -Dprotoc3.version=4.31.1 \ - -pl pulsar-client,pulsar-functions/instance \ - -Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest \ - -Dsurefire.failIfNoSpecifiedTests=false \ - test - - - name: print JVM thread dumps when cancelled - if: cancelled() - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh print_thread_dumps - - - name: Aggregates all test reports to ./test-reports and ./surefire-reports directories - if: ${{ always() }} - uses: ./.github/actions/copy-test-reports - - - name: Publish Test Report - uses: apache/pulsar-test-infra/action-junit-report@master - if: ${{ always() }} - with: - report_paths: 'test-reports/TEST-*.xml' - annotate_only: 'true' - - - name: Upload Surefire reports - uses: actions/upload-artifact@v4 - if: ${{ !success() }} - with: - name: Unit-PROTOBUF_V4_CLIENT_FUNCTIONS-surefire-reports - path: surefire-reports - retention-days: 7 - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - pulsar-java-test-image: name: Build Pulsar java-test-image ${{ matrix.base.type }} docker image for ${{ matrix.platform }} runs-on: ubuntu-24.04 @@ -1778,7 +1682,6 @@ jobs: needs: [ 'preconditions', 'unit-tests', - 'protobuf-v4-tests', 'integration-tests', 'system-tests', 'flaky-system-tests', @@ -1796,7 +1699,6 @@ jobs: if [[ ! ( \ "${{ needs.preconditions.result }}" == "success" \ && "${{ needs.unit-tests.result }}" == "success" \ - && "${{ needs.protobuf-v4-tests.result }}" == "success" \ && "${{ needs.integration-tests.result }}" == "success" \ && "${{ needs.system-tests.result }}" == "success" \ && "${{ needs.macos-build.result }}" == "success" \ @@ -1822,4 +1724,4 @@ jobs: if: ${{ needs.preconditions.outputs.docs_only != 'true' && !contains(needs.*.result, 'failure') && !contains(needs.*.result, 'cancelled') }} run: | gh-actions-artifact-client.js delete pulsar-maven-repository-binaries.tar.zst || true - gh-actions-artifact-client.js delete pulsar-server-distribution.tar.zst || true + gh-actions-artifact-client.js delete pulsar-server-distribution.tar.zst || true \ No newline at end of file diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 9653a787325fa..5286d34d6c4e9 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -115,6 +115,14 @@ function test_group_metadata() { mvn_test -pl pulsar-metadata -DtestReuseFork=false } +function test_group_protobufv4() { + mvn_test --clean --install \ + -Dprotobuf3.version=4.31.1 \ + -Dprotoc3.version=4.31.1 \ + -pl pulsar-client,pulsar-functions/instance \ + -Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest +} + # prints summaries of failed tests to console # by using the targer/surefire-reports files # works only when testForkCount > 1 since that is when surefire will create reports for individual test classes