diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 17947a45b5b19..9d7e025d71d06 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -337,6 +337,8 @@ jobs: group: CLIENT - name: Pulsar Metadata group: METADATA + - name: Protobuf v4 + group: PROTOBUFV4 steps: - name: checkout diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 9653a787325fa..5286d34d6c4e9 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -115,6 +115,14 @@ function test_group_metadata() { mvn_test -pl pulsar-metadata -DtestReuseFork=false } +function test_group_protobufv4() { + mvn_test --clean --install \ + -Dprotobuf3.version=4.31.1 \ + -Dprotoc3.version=4.31.1 \ + -pl pulsar-client,pulsar-functions/instance \ + -Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest +} + # prints summaries of failed tests to console # by using the targer/surefire-reports files # works only when testForkCount > 1 since that is when surefire will create reports for individual test classes diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 3089684a1f9d4..64862edca1e95 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -300,7 +300,7 @@ default Optional getNativeSchema() { * @param clazz the Protobuf generated class to be used to extract the schema * @return a Schema instance */ - static Schema PROTOBUF(Class clazz) { + static Schema PROTOBUF(Class clazz) { return DefaultImplementation.getDefaultImplementation() .newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build()); } @@ -311,7 +311,7 @@ static Schema PROTOBUF(Cla * @param schemaDefinition schemaDefinition the definition of the schema * @return a Schema instance */ - static Schema PROTOBUF(SchemaDefinition schemaDefinition) { + static Schema PROTOBUF(SchemaDefinition schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition); } @@ -321,7 +321,7 @@ static Schema PROTOBUF(Sch * @param clazz the Protobuf generated class to be used to extract the schema * @return a Schema instance */ - static Schema PROTOBUF_NATIVE(Class clazz) { + static Schema PROTOBUF_NATIVE(Class clazz) { return DefaultImplementation.getDefaultImplementation() .newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build()); } @@ -332,7 +332,7 @@ static Schema PROTOBUF_NAT * @param schemaDefinition schemaDefinition the definition of the schema * @return a Schema instance */ - static Schema PROTOBUF_NATIVE( + static Schema PROTOBUF_NATIVE( SchemaDefinition schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index b5f2a3a468e81..f13cc003f5be7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -116,9 +116,9 @@ Authentication createAuthentication(String authPluginClassName, Map Schema newAvroSchema(SchemaDefinition schemaDefinition); - Schema newProtobufSchema(SchemaDefinition schemaDefinition); + Schema newProtobufSchema(SchemaDefinition schemaDefinition); - Schema newProtobufNativeSchema( + Schema newProtobufNativeSchema( SchemaDefinition schemaDefinition); Schema newJSONSchema(SchemaDefinition schemaDefinition); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 0351477985f7e..0b61540821a7c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -210,12 +210,12 @@ public Schema newAvroSchema(SchemaDefinition schemaDefinition) { return AvroSchema.of(schemaDefinition); } - public Schema newProtobufSchema( + public Schema newProtobufSchema( SchemaDefinition schemaDefinition) { return ProtobufSchema.of(schemaDefinition); } - public Schema newProtobufNativeSchema( + public Schema newProtobufNativeSchema( SchemaDefinition schemaDefinition) { return ProtobufNativeSchema.of(schemaDefinition); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java index 46a2f7d806a61..9243303e84288 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.Descriptors; -import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.Message; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; @@ -41,7 +41,7 @@ /** * A schema implementation to deal with protobuf generated messages. */ -public class ProtobufNativeSchema extends AbstractStructSchema { +public class ProtobufNativeSchema extends AbstractStructSchema { public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__"; @@ -104,7 +104,7 @@ public Optional getNativeSchema() { return Optional.of(getProtobufNativeSchema()); } - public static ProtobufNativeSchema of(Class pojo) { + public static ProtobufNativeSchema of(Class pojo) { return of(pojo, new HashMap<>()); } @@ -117,8 +117,8 @@ public static ProtobufNativeSchema ofGenericClass(Class pojo, Map ProtobufNativeSchema of(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo(); - if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) { - throw new IllegalArgumentException(GeneratedMessageV3.class.getName() + if (!Message.class.isAssignableFrom(pojo)) { + throw new IllegalArgumentException(Message.class.getName() + " is not assignable from " + pojo.getName()); } Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo()); @@ -131,14 +131,13 @@ public static ProtobufNativeSchema of(SchemaDefinition schemaDefinition) .build(); try { return new ProtobufNativeSchema(schemaInfo, - (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null)); + (Message) pojo.getMethod("getDefaultInstance").invoke(null)); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } } - public static ProtobufNativeSchema of( - Class pojo, Map properties) { + public static ProtobufNativeSchema of(Class pojo, Map properties) { return ofGenericClass(pojo, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java index 2e33bcda299ad..71e0bed69120e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java @@ -21,7 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.Descriptors; -import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.Message; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.LinkedList; @@ -41,7 +41,7 @@ /** * A schema implementation to deal with protobuf generated messages. */ -public class ProtobufSchema extends AvroBaseStructSchema { +public class ProtobufSchema extends AvroBaseStructSchema { public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__"; @@ -53,7 +53,7 @@ public static class ProtoBufParsingInfo { private final String type; private final String label; // For future nested fields - private final Map definition; + private final Map definition; } private static org.apache.avro.Schema createProtobufAvroSchema(Class pojo) { @@ -89,7 +89,7 @@ public void accept(Descriptors.FieldDescriptor fieldDescriptor) { } } - public static ProtobufSchema of(Class pojo) { + public static ProtobufSchema of(Class pojo) { return of(pojo, new HashMap<>()); } @@ -102,8 +102,8 @@ public static ProtobufSchema ofGenericClass(Class pojo, Map ProtobufSchema of(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo(); - if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) { - throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName() + if (!Message.class.isAssignableFrom(pojo)) { + throw new IllegalArgumentException(Message.class.getName() + " is not assignable from " + pojo.getName()); } @@ -116,14 +116,13 @@ public static ProtobufSchema of(SchemaDefinition schemaDefinition) { try { return new ProtobufSchema(schemaInfo, - (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null)); + (Message) pojo.getMethod("getDefaultInstance").invoke(null)); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } } - public static ProtobufSchema of( - Class pojo, Map properties){ + public static ProtobufSchema of(Class pojo, Map properties) { return ofGenericClass(pojo, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java index 1c10608d448ea..0ecc0be3d8cb3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.schema.reader; -public class ProtobufNativeReader extends ProtobufReader { +import com.google.protobuf.Message; + +public class ProtobufNativeReader extends ProtobufReader { public ProtobufNativeReader(T protoMessageInstance) { super(protoMessageInstance); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java index a2504a660a041..a56fb30b1406b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl.schema.reader; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.google.protobuf.Parser; import java.io.IOException; import java.io.InputStream; @@ -27,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProtobufReader implements SchemaReader { +public class ProtobufReader implements SchemaReader { private Parser tParser; public ProtobufReader(T protoMessageInstance) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java index 32569f7b7c05f..0abd231475fbc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.schema.writer; -public class ProtobufNativeWriter extends ProtobufWriter { +import com.google.protobuf.Message; + +public class ProtobufNativeWriter extends ProtobufWriter { public ProtobufNativeWriter() { super(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java index 52ccec8dfaaca..7bab227e18156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.client.impl.schema.writer; +import com.google.protobuf.Message; import org.apache.pulsar.client.api.schema.SchemaWriter; -public class ProtobufWriter implements SchemaWriter { +public class ProtobufWriter implements SchemaWriter { @Override public byte[] write(T message) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java new file mode 100644 index 0000000000000..f9ce4ecbf9746 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.google.protobuf.Message; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.internal.PulsarClientImplementationBinding; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ProtobufSchemaApiSignatureTest { + + @Test + public void testSchemaProtobufTypeBounds() throws NoSuchMethodException { + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", Class.class), Message.class); + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", SchemaDefinition.class), Message.class); + assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF_NATIVE", Class.class), Message.class); + assertSingleTypeParamUpperBound( + Schema.class.getMethod("PROTOBUF_NATIVE", SchemaDefinition.class), Message.class); + } + + @Test + public void testBindingProtobufTypeBounds() throws NoSuchMethodException { + assertSingleTypeParamUpperBound( + PulsarClientImplementationBinding.class.getMethod("newProtobufSchema", SchemaDefinition.class), + Message.class); + assertSingleTypeParamUpperBound( + PulsarClientImplementationBinding.class.getMethod("newProtobufNativeSchema", SchemaDefinition.class), + Message.class); + } + + private static void assertSingleTypeParamUpperBound(Method method, Type expectedBound) { + TypeVariable[] typeParameters = method.getTypeParameters(); + Assert.assertEquals(typeParameters.length, 1, method + " should define one type parameter"); + Type[] bounds = typeParameters[0].getBounds(); + Assert.assertEquals(bounds.length, 1, method + " should define one type bound"); + Assert.assertEquals(bounds[0], expectedBound, method + " has unexpected type bound"); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java index a0ea3c1cc6e95..ed918cd0b8cd3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.StringValue; import com.google.protobuf.util.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -93,6 +95,19 @@ public void testEncodeAndDecode() { assertEquals(message.getStringField(), stringFieldValue); } + @Test + public void testSchemaApiSupportsMessageBound() { + Any any = Any.pack(StringValue.newBuilder().setValue("native-message").build()); + org.apache.pulsar.client.api.Schema protobufSchema = + org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE(Any.class); + + byte[] bytes = protobufSchema.encode(any); + Any message = protobufSchema.decode(bytes); + + assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE); + assertEquals(message, any); + } + @Test public void testSchema() throws Exception { ProtobufNativeSchema protobufSchema = diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java index 5acdd5b1b1cb6..9e283b6799dfd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.protobuf.Any; +import com.google.protobuf.StringValue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; @@ -121,6 +123,17 @@ public void testEncodeAndDecode() { Assert.assertEquals(message.getName(), NAME); } + @Test + public void testSchemaApiSupportsMessageBound() { + Any any = Any.pack(StringValue.newBuilder().setValue(NAME).build()); + org.apache.pulsar.client.api.Schema protobufSchema = + org.apache.pulsar.client.api.Schema.PROTOBUF(Any.class); + + byte[] bytes = protobufSchema.encode(any); + Any message = protobufSchema.decode(bytes); + Assert.assertEquals(message, any); + } + @Test public void testSchema() { ProtobufSchema protobufSchema = diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 50cb1fb54e82f..56dc802affb4b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -1182,7 +1182,7 @@ private static SchemaType getDefaultSchemaType(Class clazz) { private static boolean isProtobufClass(Class pojoClazz) { try { - Class protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3"); + Class protobufBaseClass = Class.forName("com.google.protobuf.Message"); return protobufBaseClass.isAssignableFrom(pojoClazz); } catch (ClassNotFoundException | NoClassDefFoundError e) { // If sink does not have protobuf in classpath then it cannot be protobuf diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index d8ae6b19f4a7e..81f4de4968288 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -196,7 +196,7 @@ private static Schema newSchemaInstance(Class clazz, SchemaType type, private static boolean isProtobufClass(Class pojoClazz) { try { - Class protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3"); + Class protobufBaseClass = Class.forName("com.google.protobuf.Message"); return protobufBaseClass.isAssignableFrom(pojoClazz); } catch (ClassNotFoundException | NoClassDefFoundError e) { // If function does not have protobuf in classpath then it cannot be protobuf diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 7cae03c8f5f7b..eea1c9cc96628 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.protobuf.Any; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; @@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -245,6 +247,14 @@ public void testStatsManagerNull() throws Exception { Assert.assertEquals(javaInstanceRunnable.getMetrics(), InstanceCommunication.MetricsData.newBuilder().build()); } + @Test + public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() throws Exception { + Method method = JavaInstanceRunnable.class.getDeclaredMethod("getDefaultSchemaType", Class.class); + method.setAccessible(true); + SchemaType schemaType = (SchemaType) method.invoke(null, Any.class); + Assert.assertEquals(schemaType, SchemaType.PROTOBUF); + } + @Test public void testSinkConfigParsingPreservesOriginalType() throws Exception { final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java index fa6d1a533dc88..6b065fc237a94 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java @@ -18,10 +18,16 @@ */ package org.apache.pulsar.functions.source; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import com.google.protobuf.Any; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; @@ -53,6 +59,16 @@ public void testGetSchema() { assertEquals(schema.getClass(), ProtobufNativeSchema.class); } + @Test + public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + TopicSchema topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader()); + Schema schema = topicSchema.getSchema("public/default/test-protobuf-default", Any.class, Optional.empty()); + assertEquals(schema.getClass(), ProtobufSchema.class); + } + private static class DummyClass { } }