This repository was archived by the owner on Apr 1, 2024. It is now read-only.
forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 24
This repository was archived by the owner on Apr 1, 2024. It is now read-only.
ISSUE-19565: PIP-246: Improved PROTOBUF_NATIVE schema compatibility checks without using avro-protobuf #5538
Copy link
Copy link
Open
Labels
Description
Original Issue: apache#19565
Motivation
In apache#19385:
The current implementation checks only for changing a root message name.
https://github.com/apache/pulsar/blob/34c18704ce759922ce45820321af44b382a28e10/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java#L67-L72
And the schema compatibility strategy only has FULL. So we need to improve the PROTOBUF_NATIVE schema compatibility checks.
Goal
- Implement a
ProtobufNativeSchemaValidator, just like theSchemaValidatorBuilderandSchemaValidatorinAvro(data written with one schema can be read using another). - Define and implement incompatible rules between different "version" of protoBuf, and throw an exception (schema incompatibility) when matched.
API Changes
- Add
ProtobufNativeSchemaValidator
public interface ProtobufNativeSchemaValidator {
void validate(Iterable<Descriptors.Descriptor> fromDescriptor, Descriptors.Descriptor toDescriptor)
throws ProtoBufCanReadCheckException;
}- Add
ProtobufNativeSchemaValidationStrategy
public enum ProtobufNativeSchemaValidationStrategy {
/**
* a schema can be used to read existing schema(s).
*/
CanReadExistingStrategy,
/**
* a schema can be read by existing schema(s).
*/
CanBeReadByExistingStrategy,
/**
* a schema can read existing schema(s).
*/
CanBeReadMutualStrategy
}- Add
ProtobufNativeSchemaValidatorBuilder
public class ProtobufNativeSchemaValidatorBuilder {
private ProtobufNativeSchemaValidationStrategy strategy;
private boolean onlyValidateLatest;
public ProtobufNativeSchemaValidatorBuilder validatorStrategy(
@NonNull ProtobufNativeSchemaValidationStrategy strategy) {
this.strategy = strategy;
return this;
}
public ProtobufNativeSchemaValidatorBuilder isOnlyValidateLatest(boolean onlyValidateLatest) {
this.onlyValidateLatest = onlyValidateLatest;
return this;
}
public ProtobufNativeSchemaValidator build() {
return new ProtobufNativeSchemaBreakValidatorImpl(strategy, onlyValidateLatest);
}
}Implementation
- In
ProtobufNativeSchemaCompatibilityCheck.
@Override
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
checkCompatible(Collections.singletonList(from), to, strategy);
}
@Override
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
checkArgument(from != null, "check compatibility list is null");
LinkedList<Descriptor> fromList = new LinkedList<>();
try {
for (SchemaData schemaData : from) {
fromList.addFirst(ProtobufNativeSchemaUtils.deserialize(schemaData.getData()));
}
Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData());
ProtobufNativeSchemaValidator schemaValidator = createSchemaValidator(strategy);
schemaValidator.validate(fromList, toDescriptor);
} catch (ProtoBufCanReadCheckException e) {
throw new IncompatibleSchemaException(e);
}
}
static ProtobufNativeSchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) {
final ProtobufNativeSchemaValidatorBuilder schemaValidatorBuilder = new
ProtobufNativeSchemaValidatorBuilder();
return switch (compatibilityStrategy) {
case BACKWARD_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
.isOnlyValidateLatest(false).build();
case BACKWARD -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanReadExistingStrategy)
.isOnlyValidateLatest(true).build();
case FORWARD_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
.isOnlyValidateLatest(false).build();
case FORWARD -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadByExistingStrategy)
.isOnlyValidateLatest(true).build();
case FULL_TRANSITIVE -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
.isOnlyValidateLatest(false).build();
case FULL -> schemaValidatorBuilder
.validatorStrategy(ProtobufNativeSchemaValidationStrategy.CanBeReadMutualStrategy)
.isOnlyValidateLatest(true).build();
case ALWAYS_COMPATIBLE -> ProtobufNativeAlwaysCompatibleValidator.INSTANCE;
default -> ProtobufNativeNeverCompatibleValidator.INSTANCE;
};
}canRead()will check that the written schema can be read by another.
private void validateWithStrategy(Descriptors.Descriptor toValidate, Descriptors.Descriptor fromDescriptor)
throws ProtoBufCanReadCheckException {
switch (strategy) {
case CanReadExistingStrategy -> canRead(fromDescriptor, toValidate);
case CanBeReadByExistingStrategy -> canRead(toValidate, fromDescriptor);
case CanBeReadMutualStrategy -> {
canRead(toValidate, fromDescriptor);
canRead(fromDescriptor, toValidate);
}
}
}
private void canRead(Descriptors.Descriptor writtenSchema, Descriptors.Descriptor readSchema)
throws ProtoBufCanReadCheckException {
ProtobufNativeSchemaBreakCheckUtils.checkSchemaCompatibility(writtenSchema, readSchema);
}- Backward(
CanReadExistingStrategy): The schema of the "old" version iswrittenSchema, and the schema of the "new" version isreadSchema. - Forward(
CanBeReadByExistingStrategy): The schema of the "new" version iswrittenSchema, and the schema of the "old" version isreadSchema. - Full(
CanBeReadMutualStrategy): Both checks are needed.
- The
checkSchemaCompatibility()inProtobufNativeSchemaBreakCheckUtils:
(1) Create:
- The
writtenSchemacannot add required fields, but optional or duplicate fields can be added (The field number must be new).
(2) Update:
- The
writtenSchemado not change the field number of any field in 'readSchema' (the field name is the same, but the field number is different). - The
writtenSchemadoes not change the field name and number, but it does change the field type. int32,uint32,int64,uint64, andboolare all compatible – this means you can change a field from one of these types to another without breaking forwards- or backwards-compatibility.sint32andsint64are compatible with each other but are not compatible with the other integer types.stringandbytesare compatible as long as the bytes are valid UTF-8.- Embedded messages are compatible with
bytesif the bytes contain an encoded version of the message. fixed32is compatible withsfixed32, andfixed64withsfixed64.enumis compatible withint32,uint32,int64, anduint64in terms of wire format (note that values will be truncated if they don’t fit).
(3) Delete:
- The
writtenSchemacannot remove required fields in thereadSchemaor fields that do not have a default value.
Alternatives
No response
Anything else?
No response
Links
Discussion: https://lists.apache.org/thread/c59qqzcf77w7gm9tq7thdmg0lt3qf5w8
Vote:
PR: apache#19566
Reactions are currently unavailable