Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -54,6 +57,8 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
Expand Down Expand Up @@ -275,6 +280,123 @@ public SchemaVersion versionFromBytes(byte[] version) {
return new LongSchemaVersion(bb.getLong());
}

@Override
public CompletableFuture<Long> tryComplementTheLostSchemaLedger(String schemaId, SchemaVersion version,
SchemaData schema) {
CompletableFuture<Long> promise = new CompletableFuture<>();
tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise);
return promise;
}

private void tryCompleteTheLostSchemaLedger(String schemaId, SchemaVersion version, SchemaData schema,
CompletableFuture<Long> promise) {
CompletableFuture<Optional<LocatorEntry>> schemasWithLocator = getLocator(schemaId);
schemasWithLocator.thenCompose(optEntry -> {
if (optEntry.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

LocatorEntry entry = optEntry.get();
LongSchemaVersion longVersion = (LongSchemaVersion) version;
Optional<SchemaStorageFormat.IndexEntry> optOldIndexEntry = entry.locator.getIndexList()
.stream()
.filter(indexEntry -> indexEntry.getVersion() == longVersion.getVersion())
.findFirst();

if (optOldIndexEntry.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

SchemaStorageFormat.IndexEntry oldIndexEntry = optOldIndexEntry.get();
byte[] hash = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(SchemaRegistryServiceImpl.Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
.setSchemaId(schemaId)
.setUser(schema.getUser())
.setDeleted(false)
.setTimestamp(Clock.systemUTC().millis())
.addAllProps(toPairs(schema.getProps()))
.build()
.toByteArray();

return createLedger(schemaId).thenCompose(ledgerHandle -> {
final long newLedgerId = ledgerHandle.getId();

SchemaStorageFormat.IndexEntry index = SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(oldIndexEntry.getVersion())
.setHash(copyFrom(oldIndexEntry.getHash().toByteArray()))
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
.setEntryId(oldIndexEntry.getPosition().getEntryId())
.setLedgerId(newLedgerId))
.build();

SchemaStorageFormat.SchemaEntry schemaEntry = SchemaStorageFormat.SchemaEntry.newBuilder()
.setSchemaData(copyFrom(hash))
.addAllIndex(newArrayList(index))
.build();

return addEntry(ledgerHandle, schemaEntry)
.thenApply(entryId -> {
ledgerHandle.closeAsync();
return Functions.newPositionInfo(newLedgerId, entryId);
})
.thenCompose(position -> updateExistsLocatorWithNewLedgerId(schemaId, position,
entry, longVersion, index))
.whenComplete((infoVersion, ex) -> {
if (ex == null) {
promise.complete(infoVersion);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
bookKeeper.asyncDeleteLedger(newLedgerId,
new AsyncCallback.DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete ledger {} after updating"
+ " exists schema locator with new ledgerId"
+ " failed, rc: {}", schemaId, newLedgerId, rc);
}
}
}, null);
tryCompleteTheLostSchemaLedger(schemaId, version, schema, promise);
} else {
promise.completeExceptionally(ex);
}
}
});
});
});
}

private CompletableFuture<Long> updateExistsLocatorWithNewLedgerId(String schemaId,
SchemaStorageFormat.PositionInfo position,
LocatorEntry entry,
LongSchemaVersion longVersion,
SchemaStorageFormat.IndexEntry newIndexEntry) {
long infoVersion = entry.locator.getInfo().getVersion();

SchemaStorageFormat.SchemaLocator locator = entry.locator;
SchemaStorageFormat.IndexEntry info =
SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(infoVersion)
.setPosition(position)
.setHash(copyFrom(entry.locator.getInfo().getHash().toByteArray()))
.build();

final List<SchemaStorageFormat.IndexEntry> indexList = locator.getIndexList().stream()
.map(indexEntry -> indexEntry.getVersion() == longVersion.getVersion() ? newIndexEntry : indexEntry)
.collect(Collectors.toList());

return updateSchemaLocator(getSchemaPath(schemaId),
SchemaStorageFormat.SchemaLocator.newBuilder()
.setInfo(info)
.addAllIndex(indexList)
.build()
, entry.version
).thenApply(ignore -> infoVersion);
}

@Override
public void close() throws Exception {
if (bookKeeper != null) {
Expand Down Expand Up @@ -676,6 +798,10 @@ static SchemaStorageFormat.SchemaEntry newSchemaEntry(
List<SchemaStorageFormat.IndexEntry> index,
byte[] data
) {
for (SchemaStorageFormat.IndexEntry indexEntry : index) {
log.warn("newSchemaEntry: " + indexEntry.getPosition().getLedgerId() + " - "
+ indexEntry.getPosition().getEntryId());
}
return SchemaStorageFormat.SchemaEntry.newBuilder()
.setSchemaData(copyFrom(data))
.addAllIndex(index)
Expand Down
Loading