diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index acdd906f6b8af..3920b866b1dc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -23,11 +23,14 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -54,6 +57,8 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; +import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.protocol.schema.StoredSchema; @@ -275,6 +280,123 @@ public SchemaVersion versionFromBytes(byte[] version) { return new LongSchemaVersion(bb.getLong()); } + @Override + public CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, + SchemaData schema) { + CompletableFuture promise = new CompletableFuture<>(); + tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise); + return promise; + } + + private void tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema, + CompletableFuture promise) { + CompletableFuture> schemasWithLocator = getLocator(schemaId); + schemasWithLocator.thenCompose(optEntry -> { + if (optEntry.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + LocatorEntry entry = optEntry.get(); + LongSchemaVersion longVersion = (LongSchemaVersion) version; + Optional optOldIndexEntry = entry.locator.getIndexList() + .stream() + .filter(indexEntry -> indexEntry.getVersion() == longVersion.getVersion()) + .findFirst(); + + if (optOldIndexEntry.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + SchemaStorageFormat.IndexEntry oldIndexEntry = optOldIndexEntry.get(); + byte[] hash = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(SchemaRegistryServiceImpl.Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(Clock.systemUTC().millis()) + .addAllProps(toPairs(schema.getProps())) + .build() + .toByteArray(); + + return createLedger(schemaId).thenCompose(ledgerHandle -> { + final long newLedgerId = ledgerHandle.getId(); + + SchemaStorageFormat.IndexEntry index = SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(oldIndexEntry.getVersion()) + .setHash(copyFrom(oldIndexEntry.getHash().toByteArray())) + .setPosition(SchemaStorageFormat.PositionInfo.newBuilder() + .setEntryId(oldIndexEntry.getPosition().getEntryId()) + .setLedgerId(newLedgerId)) + .build(); + + SchemaStorageFormat.SchemaEntry schemaEntry = SchemaStorageFormat.SchemaEntry.newBuilder() + .setSchemaData(copyFrom(hash)) + .addAllIndex(newArrayList(index)) + .build(); + + return addEntry(ledgerHandle, schemaEntry) + .thenApply(entryId -> { + ledgerHandle.closeAsync(); + return Functions.newPositionInfo(newLedgerId, entryId); + }) + .thenCompose(position -> updateExistsLocatorWithNewLedgerId(schemaId, position, + entry, longVersion, index)) + .whenComplete((infoVersion, ex) -> { + if (ex == null) { + promise.complete(infoVersion); + } else { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(newLedgerId, + new AsyncCallback.DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete ledger {} after updating" + + " exists schema locator with new ledgerId" + + " failed, rc: {}", schemaId, newLedgerId, rc); + } + } + }, null); + tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise); + } else { + promise.completeExceptionally(ex); + } + } + }); + }); + }); + } + + private CompletableFuture updateExistsLocatorWithNewLedgerId(String schemaId, + SchemaStorageFormat.PositionInfo position, + LocatorEntry entry, + LongSchemaVersion longVersion, + SchemaStorageFormat.IndexEntry newIndexEntry) { + long infoVersion = entry.locator.getInfo().getVersion(); + + SchemaStorageFormat.SchemaLocator locator = entry.locator; + SchemaStorageFormat.IndexEntry info = + SchemaStorageFormat.IndexEntry.newBuilder() + .setVersion(infoVersion) + .setPosition(position) + .setHash(copyFrom(entry.locator.getInfo().getHash().toByteArray())) + .build(); + + final List indexList = locator.getIndexList().stream() + .map(indexEntry -> indexEntry.getVersion() == longVersion.getVersion() ? newIndexEntry : indexEntry) + .collect(Collectors.toList()); + + return updateSchemaLocator(getSchemaPath(schemaId), + SchemaStorageFormat.SchemaLocator.newBuilder() + .setInfo(info) + .addAllIndex(indexList) + .build() + , entry.version + ).thenApply(ignore -> infoVersion); + } + @Override public void close() throws Exception { if (bookKeeper != null) { @@ -676,6 +798,10 @@ static SchemaStorageFormat.SchemaEntry newSchemaEntry( List index, byte[] data ) { + for (SchemaStorageFormat.IndexEntry indexEntry : index) { + log.warn("newSchemaEntry: " + indexEntry.getPosition().getLedgerId() + " - " + + indexEntry.getPosition().getEntryId()); + } return SchemaStorageFormat.SchemaEntry.newBuilder() .setSchemaData(copyFrom(data)) .addAllIndex(index) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index ae56df248d85d..195284deaac01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -30,6 +30,7 @@ import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.nio.ByteBuffer; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; @@ -67,6 +68,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { private final SchemaStorage schemaStorage; private final Clock clock; private final SchemaRegistryStats stats; + private static final String COMPLEMENT_SCHEMA_ENABLE = "complementSchemaEnabled"; + private static final String COMPLEMENT_SCHEMA_VERSION = "complementSchemaVersion"; @VisibleForTesting SchemaRegistryServiceImpl(SchemaStorage schemaStorage, @@ -188,65 +191,78 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem SchemaCompatibilityStrategy strategy) { MutableLong start = new MutableLong(0); CompletableFuture promise = new CompletableFuture<>(); - schemaStorage.put(schemaId, - schemasFuture -> schemasFuture - .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId, - convertToSchemaAndMetadata(schemaId, schemaFutureList))) - .thenCompose(schemaAndMetadataList -> getSchemaVersionBySchemaData(schemaAndMetadataList, schema) - .thenCompose(schemaVersion -> { - if (schemaVersion != null) { - if (log.isDebugEnabled()) { - log.debug("[{}] Schema is already exists", schemaId); - } - promise.complete(schemaVersion); - return CompletableFuture.completedFuture(null); - } - CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); - if (schemaAndMetadataList.size() != 0) { - if (isTransitiveStrategy(strategy)) { - checkCompatibilityFuture = - checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList); - } else { - checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy); - } - } else { - checkCompatibilityFuture.complete(null); - } - return checkCompatibilityFuture.thenCompose(v -> { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); - SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(clock.millis()) - .addAllProps(toPairs(schema.getProps())) - .build(); - - start.setValue(this.clock.millis()); - return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context)); - }); - }))).whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Put schema failed", schemaId, ex); + if (schema.getProps().containsKey(COMPLEMENT_SCHEMA_ENABLE) + && schema.getProps().containsKey(COMPLEMENT_SCHEMA_VERSION)) { + ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); + bbVersion.putLong(Long.parseLong(schema.getProps().get(COMPLEMENT_SCHEMA_VERSION))); + SchemaVersion schemaVersion = versionFromBytes(bbVersion.array()); + tryComplementTheLostSchema(schemaId, schemaVersion, schema, promise); + } else { + schemaStorage.put(schemaId, + schemasFuture -> schemasFuture + .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId, + convertToSchemaAndMetadata(schemaId, schemaFutureList))) + .thenCompose(schemaAndMetadataList -> + getSchemaVersionBySchemaData(schemaAndMetadataList, schema) + .thenCompose(schemaVersion -> { + if (schemaVersion != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Schema is already exists", schemaId); + } + promise.complete(schemaVersion); + return CompletableFuture.completedFuture(null); + } + CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); + if (schemaAndMetadataList.size() != 0) { + if (isTransitiveStrategy(strategy)) { + checkCompatibilityFuture = + checkCompatibilityWithAll(schemaId, schema, strategy, + schemaAndMetadataList); + } else { + checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, + schema, strategy); + } + } else { + checkCompatibilityFuture.complete(null); + } + return checkCompatibilityFuture.thenCompose(v -> { + byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); + SchemaRegistryFormat.SchemaInfo info = + SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); + + start.setValue(this.clock.millis()); + return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), + context)); + }); + }))).whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Put schema failed", schemaId, ex); + if (start.getValue() != 0) { + this.stats.recordPutFailed(schemaId); + } + promise.completeExceptionally(ex); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Put schema finished", schemaId); + } + // The schema storage will return null schema version if no schema is persisted to the storage + if (v != null) { + promise.complete(v); if (start.getValue() != 0) { - this.stats.recordPutFailed(schemaId); - } - promise.completeExceptionally(ex); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Put schema finished", schemaId); - } - // The schema storage will return null schema version if no schema is persisted to the storage - if (v != null) { - promise.complete(v); - if (start.getValue() != 0) { - this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue()); - } + this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue()); } } + } }); + } return promise; } @@ -445,6 +461,26 @@ public CompletableFuture getSchemaVersionBySchemaData( return completableFuture; } + private void tryComplementTheLostSchema(String schemaId, SchemaVersion schemaVersion, + SchemaData schema, + CompletableFuture promise) { + schemaStorage + .tryComplementTheLostSchemaLedger(schemaId, schemaVersion, schema) + .whenComplete((v, t) -> { + if (t != null) { + log.error("[{}] Complete lost schema({}) failed", schemaId, + ((LongSchemaVersion) schemaVersion).getVersion()); + promise.completeExceptionally(t); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Complete lost schema({}) finished", schemaId, + ((LongSchemaVersion) schemaVersion).getVersion()); + } + promise.complete(schemaVersion); + } + }); + } + private CompletableFuture checkCompatibilityWithLatest(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { @@ -561,13 +597,7 @@ private CompletableFuture> trimDeletedSchemaAndGetList(S } }); trimDeletedSchemaAndGetList(list); - // clean up the broken schema from zk - deleteSchemaStorage(schemaId, true).handle((sv, th) -> { - log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", rc.getMessage(), - schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage())); - schemaResult.complete(list); - return null; - }); + schemaResult.complete(list); return null; } // trim the deleted schema and return the result if schema is retrieved successfully diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 3a4016eb79c21..ec4b4ca42880c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -25,6 +25,7 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Multimap; import com.google.common.hash.HashFunction; @@ -35,6 +36,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -45,14 +47,27 @@ import java.util.concurrent.ExecutionException; import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.BrokerTestUtil; +import java.util.concurrent.TimeUnit; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Cleanup; +import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; +import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -60,9 +75,11 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -91,13 +108,15 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { private static final SchemaData schemaData3 = getSchemaData(schemaJson3); private SchemaRegistryServiceImpl schemaRegistryService; + private BookkeeperSchemaStorage storage; @BeforeMethod @Override protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); - BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); + super.setupDefaultTenantAndNamespace(); + storage = new BookkeeperSchemaStorage(pulsar); storage.start(); Map checkMap = new HashMap<>(); checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck()); @@ -336,6 +355,175 @@ public void testSchemaStorageFailed() throws Exception { } } + @Test(dataProvider = "lostSchemaLedgerIndexes", timeOut = 30000) + public void testSchemaLedgerLost(List lostSchemaLedgerIndexes) throws Exception { + final String namespace = "public/default"; + final String topic = namespace + "/testSchemaLedgerLost"; + final Schema schemaV1 = Schema.AVRO(V1Data.class); + final Schema schemaV2 = Schema.AVRO(V2Data.class); + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().setSchemaValidationEnforced(topic, true); + + Producer producer1 = pulsarClient.newProducer(schemaV1) + .topic(topic) + .producerName("producer1") + .create(); + Producer producer2 = pulsarClient.newProducer(schemaV2) + .topic(topic) + .producerName("producer2") + .create(); + Consumer consumer1 = pulsarClient.newConsumer(schemaV1) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub0") + .consumerName("consumer1") + .subscribe(); + + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + + // delete ledger + String key = TopicName.get(topic).getSchemaName(); + List schemaLedgerList = storage.getSchemaLedgerList(key); + Assert.assertEquals(schemaLedgerList.size(), 2); + for (int i = 0; i < schemaLedgerList.size(); i++){ + if (lostSchemaLedgerIndexes.contains(i)){ + storage.getBookKeeper().deleteLedger(schemaLedgerList.get(i)); + } + } + + // Without introducing this pr, connected producers or consumers are not affected if the schema ledger is lost + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer1.send(new V1Data(i)); + producer2.send(new V2Data(i, i + 1)); + } + for (int i = 0; i < numMessages; i++) { + Message msg = consumer1.receive(3, TimeUnit.SECONDS); + consumer1.acknowledge(msg); + } + + // try to complement the lost schema ledger + PostSchemaPayload v1Payload = new PostSchemaPayload(); + v1Payload.setType("AVRO"); + PostSchemaPayload v2Payload = new PostSchemaPayload(); + v2Payload.setType("AVRO"); + Map v1Properties = new HashMap<>(); + v1Properties.put("complementSchemaEnabled", "true"); + v1Properties.put("complementSchemaVersion", "0"); + Map v2Properties = new HashMap<>(); + v2Properties.put("complementSchemaEnabled", "true"); + v2Properties.put("complementSchemaVersion", "1"); + + if (lostSchemaLedgerIndexes.contains(0) && lostSchemaLedgerIndexes.size() == 1) { + SchemaInfo schemaInfo = schemaV1.getSchemaInfo(); + v1Payload.setSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)); + v1Properties.putAll(schemaInfo.getProperties()); + v1Payload.setProperties(v1Properties); + admin.schemas().createSchema(topic, v1Payload); + } else if (lostSchemaLedgerIndexes.contains(1) && lostSchemaLedgerIndexes.size() == 1) { + SchemaInfo schemaInfo = schemaV2.getSchemaInfo(); + v2Payload.setSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)); + v2Properties.putAll(schemaInfo.getProperties()); + v2Payload.setProperties(v2Properties); + admin.schemas().createSchema(topic, v2Payload); + } else if (lostSchemaLedgerIndexes.size() == 2) { + SchemaInfo v1SchemaInfo = schemaV1.getSchemaInfo(); + v1Payload.setSchema(new String(v1SchemaInfo.getSchema(), StandardCharsets.UTF_8)); + v1Properties.putAll(v1SchemaInfo.getProperties()); + v1Payload.setProperties(v1Properties); + admin.schemas().createSchema(topic, v1Payload); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1, admin.schemas().getAllSchemas(topic).size()); + }); + + SchemaInfo v2SchemaInfo = schemaV2.getSchemaInfo(); + v2Payload.setSchema(new String(v2SchemaInfo.getSchema(), StandardCharsets.UTF_8)); + v2Properties.putAll(v2SchemaInfo.getProperties()); + v2Payload.setProperties(v2Properties); + admin.schemas().createSchema(topic, v2Payload); + Awaitility.await().untilAsserted(() -> { + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + }); + } + + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + + @Cleanup + Producer producerAfterLostLedger2 = pulsarClient.newProducer(schemaV2) + .topic(topic) + .producerName("producerAfterLostLedger2") + .create(); + assertNotNull(producerAfterLostLedger2.send(new V2Data(10, 10))); + @Cleanup + Producer producerAfterLostLedger1 = pulsarClient.newProducer(schemaV1) + .topic(topic) + .producerName("producerAfterLostLedger1") + .create(); + assertNotNull(producerAfterLostLedger1.send(new V1Data(10))); + + @Cleanup + Consumer consumerAfterLostLedger1 = pulsarClient.newConsumer(schemaV1) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub1") + .consumerName("consumerAfterLostLedger1") + .subscribe(); + producer1.send(new V1Data(11)); + assertNotNull(consumerAfterLostLedger1.receive(3, TimeUnit.SECONDS)); + + @Cleanup + Consumer consumerAfterLostLedger2 = pulsarClient.newConsumer(schemaV2) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub0") + .consumerName("consumerAfterLostLedger2") + .subscribe(); + producer2.send(new V2Data(11, 11)); + assertNotNull(consumerAfterLostLedger2.receive(3, TimeUnit.SECONDS)); + + for (int i = 0; i < numMessages; i++) { + producer1.send(new V1Data(i)); + producer2.send(new V2Data(i, i + 1)); + } + for (int i = 0; i < numMessages; i++) { + Message msg = consumer1.receive(3, TimeUnit.SECONDS); + consumer1.acknowledge(msg); + } + + assertEquals(2, admin.schemas().getAllSchemas(topic).size()); + + producer1.close(); + producer2.close(); + consumer1.close(); + } + + @DataProvider(name = "lostSchemaLedgerIndexes") + public Object[][] lostSchemaLedgerIndexes(){ + return new Object[][]{ + {Arrays.asList(0,1)}, + {Arrays.asList(0)}, + {Arrays.asList(1)} + }; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + static class V1Data { + int i; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + static class V2Data { + int i; + Integer j; + } + private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception { putSchema(schemaId, schema, expectedVersion, SchemaCompatibilityStrategy.FULL); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index 9cdc85defd499..f6d33c798f4b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -54,6 +54,8 @@ default CompletableFuture put(String key, SchemaVersion versionFromBytes(byte[] version); + CompletableFuture tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema); + void start() throws Exception; void close() throws Exception;