Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ jobs:
group: CLIENT
- name: Pulsar Metadata
group: METADATA
- name: Protobuf v4
group: PROTOBUFV4

steps:
- name: checkout
Expand Down
8 changes: 8 additions & 0 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ function test_group_metadata() {
mvn_test -pl pulsar-metadata -DtestReuseFork=false
}

function test_group_protobufv4() {
mvn_test --clean --install \
-Dprotobuf3.version=4.31.1 \
-Dprotoc3.version=4.31.1 \
-pl pulsar-client,pulsar-functions/instance \
-Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest
}

# prints summaries of failed tests to console
# by using the targer/surefire-reports files
# works only when testForkCount > 1 since that is when surefire will create reports for individual test classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ default Optional<Object> getNativeSchema() {
* @param clazz the Protobuf generated class to be used to extract the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
}
Expand All @@ -311,7 +311,7 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Cla
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition);
}

Expand All @@ -321,7 +321,7 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Sch
* @param clazz the Protobuf generated class to be used to extract the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NATIVE(Class<T> clazz) {
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build());
}
Expand All @@ -332,7 +332,7 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NAT
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NATIVE(
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(
SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ Authentication createAuthentication(String authPluginClassName, Map<String, Stri

<T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition);

<T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition);
<T extends com.google.protobuf.Message> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition);

<T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSchema(
<T extends com.google.protobuf.Message> Schema<T> newProtobufNativeSchema(
SchemaDefinition schemaDefinition);

<T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ public <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return AvroSchema.of(schemaDefinition);
}

public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(
public <T extends com.google.protobuf.Message> Schema<T> newProtobufSchema(
SchemaDefinition schemaDefinition) {
return ProtobufSchema.of(schemaDefinition);
}

public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSchema(
public <T extends com.google.protobuf.Message> Schema<T> newProtobufNativeSchema(
SchemaDefinition schemaDefinition) {
return ProtobufNativeSchema.of(schemaDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
Expand All @@ -41,7 +41,7 @@
/**
* A schema implementation to deal with protobuf generated messages.
*/
public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends AbstractStructSchema<T> {
public class ProtobufNativeSchema<T extends Message> extends AbstractStructSchema<T> {

public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";

Expand Down Expand Up @@ -104,7 +104,7 @@ public Optional<Object> getNativeSchema() {
return Optional.of(getProtobufNativeSchema());
}

public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(Class<T> pojo) {
public static <T extends Message> ProtobufNativeSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}

Expand All @@ -117,8 +117,8 @@ public static <T> ProtobufNativeSchema ofGenericClass(Class<T> pojo, Map<String,
public static <T> ProtobufNativeSchema of(SchemaDefinition<T> schemaDefinition) {
Class<T> pojo = schemaDefinition.getPojo();

if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(GeneratedMessageV3.class.getName()
if (!Message.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(Message.class.getName()
+ " is not assignable from " + pojo.getName());
}
Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo());
Expand All @@ -131,14 +131,13 @@ public static <T> ProtobufNativeSchema of(SchemaDefinition<T> schemaDefinition)
.build();
try {
return new ProtobufNativeSchema(schemaInfo,
(GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null));
(Message) pojo.getMethod("getDefaultInstance").invoke(null));
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}

public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(
Class pojo, Map<String, String> properties) {
public static <T extends Message> ProtobufNativeSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return ofGenericClass(pojo, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -41,7 +41,7 @@
/**
* A schema implementation to deal with protobuf generated messages.
*/
public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends AvroBaseStructSchema<T> {
public class ProtobufSchema<T extends Message> extends AvroBaseStructSchema<T> {

public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";

Expand All @@ -53,7 +53,7 @@ public static class ProtoBufParsingInfo {
private final String type;
private final String label;
// For future nested fields
private final Map <String, Object> definition;
private final Map<String, Object> definition;
}

private static <T> org.apache.avro.Schema createProtobufAvroSchema(Class<T> pojo) {
Expand Down Expand Up @@ -89,7 +89,7 @@ public void accept(Descriptors.FieldDescriptor fieldDescriptor) {
}
}

public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}

Expand All @@ -102,8 +102,8 @@ public static <T> ProtobufSchema ofGenericClass(Class<T> pojo, Map<String, Strin
public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
Class<T> pojo = schemaDefinition.getPojo();

if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
if (!Message.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(Message.class.getName()
+ " is not assignable from " + pojo.getName());
}

Expand All @@ -116,14 +116,13 @@ public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {

try {
return new ProtobufSchema(schemaInfo,
(GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null));
(Message) pojo.getMethod("getDefaultInstance").invoke(null));
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}

public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(
Class pojo, Map<String, String> properties){
public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return ofGenericClass(pojo, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema.reader;

public class ProtobufNativeReader<T extends com.google.protobuf.GeneratedMessageV3> extends ProtobufReader<T> {
import com.google.protobuf.Message;

public class ProtobufNativeReader<T extends Message> extends ProtobufReader<T> {

public ProtobufNativeReader(T protoMessageInstance) {
super(protoMessageInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.schema.reader;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaReader<T> {
public class ProtobufReader<T extends Message> implements SchemaReader<T> {
private Parser<T> tParser;

public ProtobufReader(T protoMessageInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema.writer;

public class ProtobufNativeWriter<T extends com.google.protobuf.GeneratedMessageV3> extends ProtobufWriter<T> {
import com.google.protobuf.Message;

public class ProtobufNativeWriter<T extends Message> extends ProtobufWriter<T> {

public ProtobufNativeWriter() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.apache.pulsar.client.impl.schema.writer;

import com.google.protobuf.Message;
import org.apache.pulsar.client.api.schema.SchemaWriter;

public class ProtobufWriter<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaWriter<T> {
public class ProtobufWriter<T extends Message> implements SchemaWriter<T> {

@Override
public byte[] write(T message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import com.google.protobuf.Message;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ProtobufSchemaApiSignatureTest {

@Test
public void testSchemaProtobufTypeBounds() throws NoSuchMethodException {
assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", Class.class), Message.class);
assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", SchemaDefinition.class), Message.class);
assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF_NATIVE", Class.class), Message.class);
assertSingleTypeParamUpperBound(
Schema.class.getMethod("PROTOBUF_NATIVE", SchemaDefinition.class), Message.class);
}

@Test
public void testBindingProtobufTypeBounds() throws NoSuchMethodException {
assertSingleTypeParamUpperBound(
PulsarClientImplementationBinding.class.getMethod("newProtobufSchema", SchemaDefinition.class),
Message.class);
assertSingleTypeParamUpperBound(
PulsarClientImplementationBinding.class.getMethod("newProtobufNativeSchema", SchemaDefinition.class),
Message.class);
}

private static void assertSingleTypeParamUpperBound(Method method, Type expectedBound) {
TypeVariable<Method>[] typeParameters = method.getTypeParameters();
Assert.assertEquals(typeParameters.length, 1, method + " should define one type parameter");
Type[] bounds = typeParameters[0].getBounds();
Assert.assertEquals(bounds.length, 1, method + " should define one type bound");
Assert.assertEquals(bounds[0], expectedBound, method + " has unexpected type bound");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down Expand Up @@ -93,6 +95,19 @@ public void testEncodeAndDecode() {
assertEquals(message.getStringField(), stringFieldValue);
}

@Test
public void testSchemaApiSupportsMessageBound() {
Any any = Any.pack(StringValue.newBuilder().setValue("native-message").build());
org.apache.pulsar.client.api.Schema<Any> protobufSchema =
org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE(Any.class);

byte[] bytes = protobufSchema.encode(any);
Any message = protobufSchema.decode(bytes);

assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
assertEquals(message, any);
}

@Test
public void testSchema() throws Exception {
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.Any;
import com.google.protobuf.StringValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
Expand Down Expand Up @@ -121,6 +123,17 @@ public void testEncodeAndDecode() {
Assert.assertEquals(message.getName(), NAME);
}

@Test
public void testSchemaApiSupportsMessageBound() {
Any any = Any.pack(StringValue.newBuilder().setValue(NAME).build());
org.apache.pulsar.client.api.Schema<Any> protobufSchema =
org.apache.pulsar.client.api.Schema.PROTOBUF(Any.class);

byte[] bytes = protobufSchema.encode(any);
Any message = protobufSchema.decode(bytes);
Assert.assertEquals(message, any);
}

@Test
public void testSchema() {
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ private static SchemaType getDefaultSchemaType(Class<?> clazz) {

private static boolean isProtobufClass(Class<?> pojoClazz) {
try {
Class<?> protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3");
Class<?> protobufBaseClass = Class.forName("com.google.protobuf.Message");
return protobufBaseClass.isAssignableFrom(pojoClazz);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// If sink does not have protobuf in classpath then it cannot be protobuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type,

private static boolean isProtobufClass(Class<?> pojoClazz) {
try {
Class<?> protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3");
Class<?> protobufBaseClass = Class.forName("com.google.protobuf.Message");
return protobufBaseClass.isAssignableFrom(pojoClazz);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// If function does not have protobuf in classpath then it cannot be protobuf
Expand Down
Loading
Loading