From 30664c6ede563deead0eb14b2e3b4ee63a37d6ee Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 9 Apr 2024 23:26:54 -0700 Subject: [PATCH 1/2] [improve][admin] Recover susbcription creation on the broken schema ledger topic --- .../pulsar/broker/service/ServerCnx.java | 4 +- .../org/apache/pulsar/schema/SchemaTest.java | 76 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 716f3a1a04c25..4ee6ac43465f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -26,6 +26,7 @@ import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; +import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer; @@ -1291,7 +1292,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .schemaType(schema == null ? null : schema.getType()) .build(); if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) + return ignoreUnrecoverableBKException + (topic.addSchemaIfIdleOrCheckCompatible(schema)) .thenCompose(v -> topic.subscribe(option)); } else { return topic.subscribe(option); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index aa47c378fc38c..d21e853ba0982 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Sets; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -58,6 +59,8 @@ import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl; +import org.apache.pulsar.broker.service.schema.SchemaStorageFormat; +import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -87,6 +90,9 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.Stat; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1410,4 +1416,74 @@ public User(String name) { } } + /** + * This test validates that consumer/producers should recover on topic whose + * schema ledgers are not able to open due to non-recoverable error. + * + * @throws Exception + */ + @Test + public void testDeletedSchemaLedgerRecovery() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-multi-version-schema-one"; + final String subName = "test"; + final String topicName = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); + + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME)); + + // (1) create schema + Producer producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(topicName).create(); + + Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); + personTwo.setId(1); + personTwo.setName("Tom"); + + Consumer consumer = pulsarClient + .newConsumer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName(subName).topic(topicName).subscribe(); + + producer.send(personTwo); + producer.close(); + consumer.close(); + + // (2) Delete schema ledger + MetadataCache locatorEntryCache = pulsar.getLocalMetadataStore() + .getMetadataCache(new MetadataSerde() { + @Override + public byte[] serialize(String path, SchemaStorageFormat.SchemaLocator value) { + return value.toByteArray(); + } + + @Override + public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content, Stat stat) + throws IOException { + return SchemaStorageFormat.SchemaLocator.parseFrom(content); + } + }); + String path = "/schemas/public/" + namespace + "/test-multi-version-schema-one"; + SchemaLocator schema = locatorEntryCache.get(path).get().get(); + schema = locatorEntryCache.get(path).get().get(); + long ledgerId = schema.getInfo().getPosition().getLedgerId(); + pulsar.getBookKeeperClient().deleteLedger(ledgerId); + + // (3) Topic should recover from deleted schema and should allow to create consumer and producer + consumer = pulsarClient + .newConsumer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName(subName).topic(topicName).subscribe(); + + producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(topicName).create(); + assertNotNull(consumer); + assertNotNull(producer); + consumer.close(); + producer.close(); + } } From 325e3c6349a40fd22feaec036d05cf8fceeba69d Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 10 Apr 2024 15:27:27 -0700 Subject: [PATCH 2/2] Fix test --- .../pulsar/broker/service/schema/BookkeeperSchemaStorage.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index c509764bf6710..acdd906f6b8af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -52,6 +52,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -716,6 +717,7 @@ public static CompletableFuture ignoreUnrecoverableBKException(Completabl return source.exceptionally(t -> { if (t.getCause() != null && (t.getCause() instanceof SchemaException) + && !(t.getCause() instanceof IncompatibleSchemaException) && !((SchemaException) t.getCause()).isRecoverable()) { // Meeting NoSuchLedgerExistsException, NoSuchEntryException or // NoSuchLedgerExistsOnMetadataServerException when reading schemas in