Skip to content

External schema implementation for PIP-420#33

Open
gaoran10 wants to merge 5 commits intomasterfrom
pip-420-external-schema-2
Open

External schema implementation for PIP-420#33
gaoran10 wants to merge 5 commits intomasterfrom
pip-420-external-schema-2

Conversation

@gaoran10
Copy link
Owner

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@gaoran10 gaoran10 changed the title PIP-420 external schema 2 PIP-420 external schema implementation 2 Jul 25, 2025
@gaoran10 gaoran10 changed the title PIP-420 external schema implementation 2 External schema implementation for PIP-420 Jul 29, 2025

@AllArgsConstructor
@Data
public class EncodeData {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar 4.1.0 will require Java 17 for client, here it can be changed to a record type after apache#24475

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


public EncodeData(byte[] data) {
this.data = data;
this.schemaData = null;
Copy link

@BewareMyPower BewareMyPower Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better avoid storing a null byte array, using new byte[0] might be better to avoid null checks on getSchemaId() and the schemaId fields

return CompletableFuture.completedFuture(null);
}

schema.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an asynchronous method to Schema. Otherwise, you have to handle the exception here and it could block the closeAsync() method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though Kafka's implementation only provides the synchronous method, it can be wrapped into an asynchronous method like:

        final var future = new CompletableFuture<Void>();
        new Thread(() -> {
            kafkaSerializer.close();
            future.complete(null);
        }).start();

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


// Indicate if the message partition key is set
optional bool null_partition_key = 30 [default = false];
optional bytes schema_id = 31;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use schema_properties/schema_metadata is better. Since the external schema may not only store the scheam id here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussed with @BewareMyPower , seems using schema_id also makes sense to me now as it can be an bytes type identifier for each schema in each messages.

}

@Override
public ClientBuilder schemaProperties(Map<String, String> schemaProperties) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schemaProperties is the client level. But in kafka, we can specify the schema config like schema registry url or auth in the producer/consumer level. Would it be an issue?
We may need to consider to move it to the producer/consumer level.

Copy link
Owner Author

@gaoran10 gaoran10 Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Yes, we don't need to add this for the client. Maybe we can set the configuration while building the external schemas; the external schema needs an initialization process.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the schema properties.

Comment on lines 372 to +377
if (!messageMetadata.hasSchemaVersion()) {
return msg.getSchemaVersion() == null;
}
if (messageMetadata.hasSchemaId()) {
return Arrays.equals(msg.getSchemaId(), messageMetadata.getSchemaId());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users use the external scheam, what the schema version would be?
We should still check the scheam id equality if the scheam version is set to null.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

If using the external schema, the schema version will point to an external schema info or a key-value schema info.

We need to check both the schema version and schema ID.

Comment on lines +140 to +142
if (!messageMetadata.hasSchemaId() && msg.getSchemaId() != null) {
messageMetadata.setSchemaId(msg.getSchemaId());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be moved to the block of

        if (++numMessagesInBatch == 1) {

because we only need to set the schema id for the 1st message in the batch.

The schema id validation between messages in the same batch has already been performed in hasSameSchema, which is called in canAddToCurrentBatch

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
byte[] encode(T message);

default EncodeData encode(String topic, T message) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a case that encode could fail due to a failure happened at the external schema registry. For example, if the message failed to register it's schema, encode could throw an exception, which will eventually be propagated to sendAsync.

From the perspective of users, they will see sendAsync() throwing an exception, rather than a failed future being returned by sendAsync.

It's better to change the signature to

    default EncodeData encode(String topic, T message) throws PulsarClientException {

and catch the exception in sendAsync

    public CompletableFuture<MessageId> sendAsync() {
        Message<T> message;
        try {
            message = getMessage();
        } catch (PulsarClientException e) {
            return CompletableFuture.failedFuture(e);
        }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is the same issue with the decode method

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

We can throw the SchemaSerializationException.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a description of the SchemaSerializationException to the method comment; otherwise, we need to modify all encode and decode methods and their callers. The encode method already has a throws comment, so it may throw exceptions.

Comment on lines +428 to +431
if (msgMetadata.hasSchemaId()) {
byte[] schemaId = msgMetadata.getSchemaId();
return (schemaId.length == 0) ? null : schemaId;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not return schema id from this method, because existing applications might already assume the schema version is a long byte array (LongSchemaVersion)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, the result of getSchemaVersion() might be passed to SchemaInfoProvider#getSchemaByVersion. If we're using schema id as the result, the result of getSchemaByVersion would be unexpected.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the schema ID exists, it indicates that users use the external schema.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 add a private method

public void testExternalSchemaCompatibilityCheck(SchemaData schemaData) {
try {
externalSchemaCompatibilityCheck.checkCompatible(
schemaData, externalSchemaData, SchemaCompatibilityStrategy.FULL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can test all strategies except for ALWAYS_COMPATIBLE

Comment on lines +141 to +147
default T decode(String topic, ByteBuffer data, byte[] schemaId) {
return decode(topic, getBytes(data), schemaId);
}

default T decode(String topic, byte[] data, byte[] schemaId) {
return decode(data, schemaId);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests that verify if these new decode overloads throw an exception, this exception will be propagated to Message#getValue.

Similar tests should be added for sendAsync with a custom encode implementation,

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*
* @return schema registry configurations
*/
default Map<String, String> getConfigs() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to comment here, passing the properties by SchemaInfoProvider#getConfigs is not clear.

Actually, we should not call setSchemaInfoProvider when the schema type is External.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants