diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 9f3fe9bb0c4a7..19d5744498d84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -215,7 +215,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, metadataStore.registerSessionListener(this::handleMetadataStoreNotification); } - static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { + public static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { private final BookKeeper bkClient; diff --git a/pulsar-sql/presto-distribution/src/main/resources/conf/jvm.config b/pulsar-sql/presto-distribution/src/main/resources/conf/jvm.config index 86c9d0613b233..71e9ec1273bc0 100644 --- a/pulsar-sql/presto-distribution/src/main/resources/conf/jvm.config +++ b/pulsar-sql/presto-distribution/src/main/resources/conf/jvm.config @@ -25,4 +25,4 @@ -XX:+HeapDumpOnOutOfMemoryError -XX:+ExitOnOutOfMemoryError -Dpresto-temporarily-allow-java8=true --Djdk.attach.allowAttachSelf=true +-Djdk.attach.allowAttachSelf=true \ No newline at end of file diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 20b00b59e5ab8..1831a4d9fb575 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -35,6 +36,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.mledger.offload.OffloadersCache; @@ -68,14 +70,17 @@ public class PulsarConnectorCache { private LedgerOffloader defaultOffloader; private Map offloaderMap = new ConcurrentHashMap<>(); + @Getter + private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperFactory; + private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory"; private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver"; private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads"; - private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getMetadataUrl(), MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); + this.bookKeeperFactory = initBookKeeperFactory(pulsarConnectorConfig); this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig); this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(), StatsProvider.class, getClass().getClassLoader()); @@ -108,27 +113,32 @@ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsa return instance; } - private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) - throws Exception { + private BookkeeperFactoryForCustomEnsemblePlacementPolicy initBookKeeperFactory( + PulsarConnectorConfig pulsarConnectorConfig) throws Exception { PulsarMetadataClientDriver.init(); ClientConfiguration bkClientConfiguration = new ClientConfiguration() - .setMetadataServiceUri("metadata-store:" + pulsarConnectorConfig.getMetadataUrl()) - .setClientTcpNoDelay(false) - .setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol()) - .setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval()) - .setStickyReadsEnabled(false) - .setReadEntryTimeout(60) - .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) - .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) - .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()) - .setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING); + .setMetadataServiceUri("metadata-store:" + pulsarConnectorConfig.getMetadataUrl()) + .setClientTcpNoDelay(false) + .setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol()) + .setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval()) + .setStickyReadsEnabled(false) + .setReadEntryTimeout(60) + .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) + .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) + .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()) + .setNettyMaxFrameSizeBytes( + pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING); + return new ManagedLedgerFactoryImpl.DefaultBkFactory(bkClientConfiguration); + } + private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) + throws Exception { ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB()); managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads( pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads()); - return new ManagedLedgerFactoryImpl(metadataStore, bkClientConfiguration, managedLedgerFactoryConfig); + return new ManagedLedgerFactoryImpl(metadataStore, bookKeeperFactory, managedLedgerFactoryConfig); } public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPoliciesImpl offloadPolicies, @@ -206,6 +216,7 @@ public static void shutdown() throws Exception { if (instance != null) { instance.statsProvider.stop(); instance.managedLedgerFactory.shutdown(); + instance.bookKeeperFactory.get().close(); instance.metadataStore.close(); instance.offloaderScheduler.shutdown(); instance.offloadersCache.close(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java index a0812085f04e1..f33eb9be5f8fd 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; @@ -62,8 +63,11 @@ public class PulsarInternalColumn { public static final PulsarInternalColumn PROPERTIES = new PulsarInternalColumn("__properties__", VarcharType.VARCHAR, "User defined properties"); + public static final PulsarInternalColumn COMPACTED_QUERY = new PulsarInternalColumn("__compacted_query__", + BooleanType.BOOLEAN, "Compacted query flag"); + private static Set internalFields = ImmutableSet.of(PARTITION, EVENT_TIME, PUBLISH_TIME, - MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES); + MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES, COMPACTED_QUERY); private final String name; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index f86335ae3780b..8645696d3c003 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -20,8 +20,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.decoder.FieldValueProviders.booleanValueProvider; import static io.trino.decoder.FieldValueProviders.bytesValueProvider; import static io.trino.decoder.FieldValueProviders.longValueProvider; +import static org.apache.pulsar.sql.presto.PulsarInternalColumn.COMPACTED_QUERY; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -38,7 +40,9 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.type.Type; import java.io.IOException; +import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,6 +52,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -59,14 +70,24 @@ import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; +import org.apache.pulsar.common.api.proto.CompressionType; +import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; import org.apache.pulsar.common.api.raw.RawMessageIdImpl; import org.apache.pulsar.common.api.raw.RawMessageImpl; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -85,6 +106,7 @@ */ public class PulsarRecordCursor implements RecordCursor { + @Getter private List columnHandles; private PulsarSplit pulsarSplit; private PulsarConnectorConfig pulsarConnectorConfig; @@ -123,6 +145,18 @@ public class PulsarRecordCursor implements RecordCursor { protected ConcurrentOpenHashMap chunkedMessagesMap = ConcurrentOpenHashMap.newBuilder().build(); + private OffloadPoliciesImpl offloadPolicies; + + private BookKeeper bookKeeper; + private ManagedLedgerFactory managedLedgerFactory; + private ManagedLedgerConfig managedLedgerConfig; + + private CompactedLedgerReader compactedLedgerReader; + private volatile Throwable compactedHandleError; + private final boolean isCompactedQuery; + protected ConcurrentOpenHashMap compactedMessageIds = + ConcurrentOpenHashMap.newBuilder().build(); + private static final Logger log = Logger.get(PulsarRecordCursor.class); public PulsarRecordCursor(List columnHandles, PulsarSplit pulsarSplit, @@ -140,21 +174,23 @@ public PulsarRecordCursor(List columnHandles, PulsarSplit pu throw new RuntimeException(e); } - OffloadPoliciesImpl offloadPolicies = pulsarSplit.getOffloadPolicies(); + bookKeeper = pulsarConnectorCache.getBookKeeperFactory().get(); + managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory(); + managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig( + TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), + pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies, + pulsarConnectorConfig); + + offloadPolicies = pulsarSplit.getOffloadPolicies(); if (offloadPolicies != null) { offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory()); offloadPolicies.setManagedLedgerOffloadMaxThreads( pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()); } - initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, - pulsarConnectorCache.getManagedLedgerFactory(), - pulsarConnectorCache.getManagedLedgerConfig( - TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), - pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies, - pulsarConnectorConfig), + this.isCompactedQuery = pulsarSplit.isCompactedQuery(); + initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider())); this.decoderFactory = decoderFactory; - initEntryCacheSizeAllocator(pulsarConnectorConfig); } // Exposed for testing purposes @@ -163,11 +199,26 @@ public PulsarRecordCursor(List columnHandles, PulsarSplit pu PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker, PulsarDispatchingRowDecoderFactory decoderFactory) { this.splitSize = pulsarSplit.getSplitSize(); + this.isCompactedQuery = pulsarSplit.isCompactedQuery(); initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker); this.decoderFactory = decoderFactory; } + PulsarRecordCursor(List columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig + pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, + PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker, + PulsarDispatchingRowDecoderFactory decoderFactory, BookKeeper bookKeeper) { + this.splitSize = pulsarSplit.getSplitSize(); + this.isCompactedQuery = pulsarSplit.isCompactedQuery(); + this.bookKeeper = bookKeeper; + this.managedLedgerFactory = managedLedgerFactory; + this.managedLedgerConfig = managedLedgerConfig; + initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, + pulsarConnectorMetricsTracker); + this.decoderFactory = decoderFactory; + } + private void initialize(List columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) { @@ -198,9 +249,11 @@ private void initialize(List columnHandles, PulsarSplit puls log.info("Initializing split with parameters: %s", pulsarSplit); try { - this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), - pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig); - } catch (ManagedLedgerException | InterruptedException e) { + if (!isCompactedQuery) { + this.cursor = getCursor(topicName, + pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig); + } + } catch (Exception e) { log.error(e, "Failed to get read only cursor"); close(); throw new RuntimeException(e); @@ -306,15 +359,27 @@ public void accept(Entry entry) { message = null; } if (message != null) { - while (true) { - if (!haveAvailableCacheSize( - messageQueueCacheSizeAllocator, messageQueue) - || !messageQueue.offer(message)) { - Thread.sleep(1); - } else { - messageQueueCacheSizeAllocator.allocate( - message.getData().readableBytes()); - break; + if (isCompactedQuery && message.getKey().isPresent()) { + RawMessageIdImpl messageId = + (RawMessageIdImpl) message.getMessageId(); + cacheCompactedMessageIds(message.getKey().get(), + new BatchMessageIdImpl( + messageId.getLedgerId(), + messageId.getEntryId(), + partition, + (int) messageId.getBatchIndex()), + message.getData().readableBytes()); + } else { + while (true) { + if (!haveAvailableCacheSize( + messageQueueCacheSizeAllocator, messageQueue) + || !messageQueue.offer(message)) { + Thread.sleep(1); + } else { + messageQueueCacheSizeAllocator.allocate( + message.getData().readableBytes()); + break; + } } } } @@ -391,7 +456,8 @@ public void run() { ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor); // check if ledger is offloaded - if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) { + if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo() != null + && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) { log.warn( "Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured", readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName()); @@ -480,16 +546,27 @@ private boolean haveAvailableCacheSize(CacheSizeAllocator cacheSizeAllocator, Sp @Override public boolean advanceNextPosition() { - if (readEntries == null) { - // start deserialize thread - deserializeEntries = new DeserializeEntries(); - deserializeEntries.setUncaughtExceptionHandler((t, ex) -> { - deserializingError = ex; - }); - deserializeEntries.start(); - - readEntries = new ReadEntries(); - readEntries.run(); + if (isCompactedQuery) { + if (compactedLedgerReader == null) { + compactedLedgerReader = new CompactedLedgerReader(); + Thread compactedHandleThread = new Thread(new CompactedLedgerReader()); + compactedHandleThread.setUncaughtExceptionHandler((t, ex) -> { + compactedHandleError = ex; + }); + compactedHandleThread.start(); + } + } else { + if (readEntries == null) { + // start deserialize thread + deserializeEntries = new DeserializeEntries(); + deserializeEntries.setUncaughtExceptionHandler((t, ex) -> { + deserializingError = ex; + }); + deserializeEntries.start(); + + readEntries = new ReadEntries(); + readEntries.run(); + } } if (currentMessage != null) { @@ -498,12 +575,14 @@ public boolean advanceNextPosition() { } while (true) { - if (readEntries.hasFinished()) { + if (readEntries != null && readEntries.hasFinished() && compactedMessageIds.isEmpty()) { return false; } if ((messageQueue.capacity() - messageQueue.size()) > 0) { - readEntries.run(); + if (readEntries != null) { + readEntries.run(); + } } currentMessage = messageQueue.poll(); @@ -512,11 +591,19 @@ public boolean advanceNextPosition() { break; } else if (deserializingError != null) { throw new RuntimeException(deserializingError); + } else if (compactedHandleError != null) { + throw new RuntimeException(compactedHandleError); } else { try { - Thread.sleep(1); + long waitMills = 1; + if (isCompactedQuery) { + // Wait more time when executing compacted query, + // because compacted query requires read all data to get the latest data of the key. + waitMills = 10; + } + Thread.sleep(waitMills); // stats for time spent wait to read from message queue because its empty - metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1); + metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(waitMills); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -589,9 +676,11 @@ public boolean advanceNextPosition() { .filter(col -> PulsarColumnHandle.HandleKeyValueType.NONE .equals(col.getHandleKeyValueType())) .collect(toImmutableSet())); - Optional> decodedValue = - messageDecoder.decodeRow(this.currentMessage.getData()); - decodedValue.ifPresent(currentRowValuesMap::putAll); + if (this.currentMessage.getData().readableBytes() > 0) { + Optional> decodedValue = + messageDecoder.decodeRow(this.currentMessage.getData()); + decodedValue.ifPresent(currentRowValuesMap::putAll); + } } for (DecoderColumnHandle columnHandle : columnHandles) { @@ -622,6 +711,8 @@ public boolean advanceNextPosition() { } catch (JsonProcessingException e) { throw new RuntimeException(e); } + } else if (COMPACTED_QUERY.getName().equals(columnHandle.getName())) { + currentRowValuesMap.put(columnHandle, booleanValueProvider(isCompactedQuery)); } else { throw new IllegalArgumentException("unknown internal field " + columnHandle.getName()); } @@ -748,6 +839,10 @@ public void close() { } } + if (this.compactedLedgerReader != null) { + this.compactedLedgerReader.close(); + } + // set stat for total execution time of split if (this.metricsTracker != null) { this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime); @@ -867,4 +962,294 @@ public void recycle() { } } + private void cacheCompactedMessageIds(String key, BatchMessageIdImpl messageId, long payloadSize) { + if (key == null) { + return; + } + if (payloadSize > 0) { + compactedMessageIds.put(key, messageId); + } else { + compactedMessageIds.remove(key); + } + } + + class CompactedLedgerReader implements AsyncCallback.ReadCallback, Runnable { + + private final int readSize = 1000; + private final SpscArrayQueue queue; + private final AtomicBoolean havePendingRead = new AtomicBoolean(false); + private long startEntry = 0; + private long readEntry = -1; + private MessageIdData firstUnCompactedMessageId; + private long compactedLedgerId = -1; + private LedgerHandle compactedLedgerHandle; + + CompactedLedgerReader() { + queue = new SpscArrayQueue<>((int) (readSize * 1.5)); + firstUnCompactedMessageId = new MessageIdData(); + firstUnCompactedMessageId.setLedgerId(-1); + firstUnCompactedMessageId.setEntryId(-1); + try { + initCompactedRead(); + } catch (Exception e) { + log.error(e, "Failed to init compacted reader."); + throw new RuntimeException(e); + } + } + + private void initCompactedRead() throws Exception { + String topic = topicName.getPartition(partition).toString(); + PersistentTopicInternalStats internalStats = + schemaInfoProvider.getPulsarAdmin().topics().getInternalStats(topic); + if (internalStats.compactedLedger == null + || internalStats.compactedLedger.ledgerId == -1 + || internalStats.compactedLedger.entries == 0) { + return; + } + this.compactedLedgerId = internalStats.compactedLedger.ledgerId; + this.compactedLedgerHandle = bookKeeper.openLedger(compactedLedgerId, + BookKeeper.DigestType.fromApiDigestType(DigestType.CRC32), "".getBytes()); + } + + private void readMoreEntriesIfNeed() { + if (havePendingRead.get()) { + return; + } + if (startEntry > compactedLedgerHandle.getLastAddConfirmed()) { + return; + } + if (queue.size() < readSize / 2) { + havePendingRead.set(true); + long endEntry = Math.min(compactedLedgerHandle.getLastAddConfirmed(), startEntry + readSize); + compactedLedgerHandle.asyncReadEntries(startEntry, endEntry, this, null); + } + } + + @Override + public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration enumeration, Object o) { + if (enumeration == null) { + havePendingRead.set(false); + return; + } + while (enumeration.hasMoreElements()) { + LedgerEntry ledgerEntry = enumeration.nextElement(); + queue.offer(ledgerEntry); + startEntry = ledgerEntry.getEntryId() + 1; + } + havePendingRead.set(false); + } + + @Override + public void run() { + readCompactedData(); + readUnCompactedData(); + readCompleteData(); + } + + private void readCompactedData() { + if (compactedLedgerId == -1 || compactedLedgerHandle == null) { + log.info("[%s] Compacted ledger is not exist.", topicName); + return; + } + + log.info("[%s] Start to read compacted data, compacted ledger is %d, the LAC of compacted ledger is %d.", + topicName, compactedLedgerId, compactedLedgerHandle.getLastAddConfirmed()); + while (readEntry < compactedLedgerHandle.getLastAddConfirmed()) { + readMoreEntriesIfNeed(); + LedgerEntry ledgerEntry = queue.poll(); + if (ledgerEntry == null) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + continue; + } + + ByteBuf buffer = ledgerEntry.getEntryBuffer(); + int idSize = buffer.readInt(); + + firstUnCompactedMessageId = new MessageIdData(); + firstUnCompactedMessageId.parseFrom(buffer, idSize); + int payloadAndMetadataSize = buffer.readInt(); + ByteBuf metadataAndPayload = buffer.slice(buffer.readerIndex(), payloadAndMetadataSize); + MessageMetadata messageMetadata = Commands.parseMessageMetadata(metadataAndPayload); + + if (messageMetadata.hasNumMessagesInBatch()) { + try { + handleCompactedBatchData( + ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), ledgerEntry.getEntryBuffer()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + cacheCompactedMessageIds(messageMetadata.getPartitionKey(), new BatchMessageIdImpl( + ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), partition, 0), + messageMetadata.getUncompressedSize()); + } + readEntry = ledgerEntry.getEntryId(); + } + log.info("[%s] Finish to read compacted data, read entry is %s, compacted messages count %d.", + topicName, compactedLedgerId + ":" + readEntry, compactedMessageIds.size()); + } + + private void handleCompactedBatchData(long ledgerId, long entryId, ByteBuf payload) throws IOException { + MessageMetadata metadata = Commands.parseMessageMetadata(payload); + int batchSize = metadata.getNumMessagesInBatch(); + + CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); + int uncompressedSize = metadata.getUncompressedSize(); + ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); + + SingleMessageMetadata smm = new SingleMessageMetadata(); + for (int i = 0; i < batchSize; i++) { + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch( + uncompressedPayload, smm, 0, batchSize); + if (!smm.hasPartitionKey()) { + continue; + } + BatchMessageIdImpl id = new BatchMessageIdImpl(ledgerId, entryId, partition, i); + cacheCompactedMessageIds(smm.getPartitionKey(), id, smm.getPayloadSize()); + singleMessagePayload.release(); + } + uncompressedPayload.release(); + } + + private void readUnCompactedData() { + log.info("[%s] Start to read unCompacted data, first unCompacted message id is %s.", topicName, + firstUnCompactedMessageId.getLedgerId() + ":" + firstUnCompactedMessageId.getEntryId()); + try { + PositionImpl startPosition; + if (firstUnCompactedMessageId.getLedgerId() == -1 && firstUnCompactedMessageId.getEntryId() == -1) { + startPosition = PositionImpl.EARLIEST; + } else { + startPosition = PositionImpl.get(firstUnCompactedMessageId.getLedgerId(), + firstUnCompactedMessageId.getEntryId()).getNext(); + } + cursor = getCursor(topicName, startPosition, managedLedgerFactory, managedLedgerConfig); + + readEntries = new ReadEntries(); + readEntries.run(); + + deserializeEntries = new DeserializeEntries(); + deserializeEntries.setUncaughtExceptionHandler((t, ex) -> { + deserializingError = ex; + }); + deserializeEntries.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Make sure all data read completely, find the latest data of the key. + while (!readEntries.hasFinished()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.info("[%s] Finish to read unCompacted data, read position is %s.", + topicName, cursor.getReadPosition().toString()); + } + + private void readCompleteData() { + log.info("[%s] Start to read complete data, compacted messages count %d.", + topicName, compactedMessageIds.size()); + Iterator messageIdIterator = compactedMessageIds.values().stream().sorted().iterator(); + LedgerHandle readLedgerHandle = null; + AtomicBoolean isCompactedLedger = new AtomicBoolean(false); + while (messageIdIterator.hasNext()) { + BatchMessageIdImpl messageId = messageIdIterator.next(); + readLedgerHandle = getLedgerHandle(messageId, isCompactedLedger, readLedgerHandle); + readEntries(readLedgerHandle, messageId.getLedgerId(), messageId.getEntryId(), + messageId.getBatchIndex(), isCompactedLedger.get()); + } + if (readLedgerHandle != null) { + try { + readLedgerHandle.close(); + } catch (Exception e) { + log.warn("Failed to close ledger handle for ledger %s.", readLedgerHandle.getId()); + } + } + log.info("[%s] Finish to read complete data.", topicName); + } + + private LedgerHandle getLedgerHandle(MessageIdImpl messageId, + AtomicBoolean isCompactedLedger, + LedgerHandle readLedgerHandle) { + if (messageId.getLedgerId() == compactedLedgerId) { + isCompactedLedger.set(true); + return compactedLedgerHandle; + } else { + try { + if (readLedgerHandle == null || readLedgerHandle.getId() != messageId.getLedgerId()) { + if (readLedgerHandle != null) { + readLedgerHandle.close(); + } + isCompactedLedger.set(false); + return bookKeeper.openLedger( + messageId.getLedgerId(), BookKeeper.DigestType.CRC32C, "".getBytes()); + } else { + return readLedgerHandle; + } + } catch (Exception e) { + log.error(e, "Failed to open ledger handle for ledger %s.", messageId.getLedgerId()); + throw new RuntimeException(e); + } + } + } + + private void readEntries(LedgerHandle ledgerHandle, long ledgerId, long entryId, int batchIndex, + boolean isCompactedLedger) { + try { + LedgerEntries ledgerEntries = + ledgerHandle.read(entryId, entryId); + Iterator iterator = ledgerEntries.iterator(); + if (iterator.hasNext()) { + org.apache.bookkeeper.client.api.LedgerEntry ledgerEntry = iterator.next(); + ByteBuf buffer = ledgerEntry.getEntryBuffer(); + + if (isCompactedLedger) { + int idSize = buffer.readInt(); + MessageIdData lastMessageId = new MessageIdData(); + lastMessageId.parseFrom(buffer, idSize); + int payloadAndMetadataSize = buffer.readInt(); + buffer = buffer.slice(buffer.readerIndex(), payloadAndMetadataSize); + } + + parseAndAddMessageToQueue(ledgerId, entryId, batchIndex, buffer); + } + } catch (Exception e) { + log.error(e, "Failed to read original entries."); + throw new RuntimeException(e); + } + } + + private void parseAndAddMessageToQueue( + long ledgerId, long entryId, int batchIndex, ByteBuf entryBuffer) throws IOException { + MessageParser.parseMessage(topicName, ledgerId, + entryId, entryBuffer, message -> { + if (((RawMessageIdImpl) message.getMessageId()).getBatchIndex() == batchIndex) { + messageQueue.offer(message); + if (message.getKey().isPresent()) { + compactedMessageIds.remove(message.getKey().get()); + } + } + }, pulsarConnectorConfig.getMaxMessageSize()); + } + + private void close() { + if (this.compactedLedgerHandle != null) { + try { + this.compactedLedgerHandle.close(); + } catch (Exception e) { + log.error(e, "Failed to close compacted leger handle."); + } + } + } + + } + + } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index 1967ec5e436b6..646ad8e048273 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -63,6 +63,7 @@ public class PulsarSplit implements ConnectorSplit { private final String schemaInfoProperties; private final OffloadPoliciesImpl offloadPolicies; + private final Boolean compactedQuery; @JsonCreator public PulsarSplit( @@ -80,7 +81,8 @@ public PulsarSplit( @JsonProperty("endPositionLedgerId") long endPositionLedgerId, @JsonProperty("tupleDomain") TupleDomain tupleDomain, @JsonProperty("schemaInfoProperties") String schemaInfoProperties, - @JsonProperty("offloadPolicies") OffloadPoliciesImpl offloadPolicies) throws IOException { + @JsonProperty("offloadPolicies") OffloadPoliciesImpl offloadPolicies, + @JsonProperty("compactedQuery") Boolean compactedQuery) throws IOException { this.splitId = splitId; requireNonNull(schemaName, "schema name is null"); this.originSchemaName = originSchemaName; @@ -107,6 +109,8 @@ public PulsarSplit( .schema(schema.getBytes("ISO8859-1")) .properties(objectMapper.readValue(schemaInfoProperties, Map.class)) .build(); + + this.compactedQuery = compactedQuery; } @JsonProperty @@ -192,6 +196,11 @@ public OffloadPoliciesImpl getOffloadPolicies() { return offloadPolicies; } + @JsonProperty + public Boolean isCompactedQuery() { + return compactedQuery; + } + @Override public boolean isRemotelyAccessible() { return true; @@ -223,6 +232,7 @@ public String toString() { + ", startPositionLedgerId=" + startPositionLedgerId + ", endPositionLedgerId=" + endPositionLedgerId + ", schemaInfoProperties=" + schemaInfoProperties + + ", compactedQuery=" + compactedQuery + (offloadPolicies == null ? "" : offloadPolicies.toString()) + '}'; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 464e70b18dddd..28e07f52bb554 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -45,11 +45,13 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; import lombok.Data; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -59,6 +61,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaInfo; @@ -77,6 +80,8 @@ public class PulsarSplitManager implements ConnectorSplitManager { private final ObjectMapper objectMapper = new ObjectMapper(); + private static final String COMPACTION_SUBSCRIPTION = "__compaction"; + @Inject public PulsarSplitManager(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); @@ -122,6 +127,17 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand } } + AtomicReference readCompactedTypeReference = new AtomicReference<>(false); + tupleDomain.getDomains().ifPresent(domainMap -> { + Domain readCompactedDomain = domainMap.get( + PulsarInternalColumn.COMPACTED_QUERY.getColumnHandle(connectorId, false)); + if (readCompactedDomain != null) { + readCompactedTypeReference.set((Boolean) readCompactedDomain.getSingleValue()); + } + + }); + boolean readCompactedType = readCompactedTypeReference.get(); + Collection splits; try { OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) this.pulsarAdmin.namespaces() @@ -133,11 +149,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand } if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) { splits = getSplitsNonPartitionedTopic( - numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies); + numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies, readCompactedType); log.debug("Splits for non-partitioned topic %s: %s", topicName, splits); } else { splits = getSplitsPartitionedTopic( - numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies); + numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies, readCompactedType); log.debug("Splits for partitioned topic %s: %s", topicName, splits); } } catch (Exception e) { @@ -150,7 +166,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand @VisibleForTesting Collection getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain, - OffloadPoliciesImpl offloadPolicies) throws Exception { + OffloadPoliciesImpl offloadPolicies, boolean readCompacted) throws Exception { List predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain); if (log.isDebugEnabled()) { @@ -163,6 +179,11 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic int splitRemainder = actualNumSplits % predicatedPartitions.size(); + if (readCompacted) { + splitsPerPartition = 1; + splitRemainder = 0; + } + PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig); ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory(); ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig( @@ -173,7 +194,7 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition; splits.addAll( getSplitsForTopic( - topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(), + topicName.getPartition(predicatedPartitions.get(i)), managedLedgerFactory, managedLedgerConfig, splitsForThisPartition, @@ -181,7 +202,8 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic schemaInfo, topicName.getPartition(predicatedPartitions.get(i)).getLocalName(), tupleDomain, - offloadPolicies)); + offloadPolicies, + readCompacted)); } return splits; } @@ -238,14 +260,14 @@ private List getPredicatedPartitions(TopicName topicName, TupleDomain getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain, - OffloadPoliciesImpl offloadPolicies) throws Exception { + OffloadPoliciesImpl offloadPolicies, boolean readCompacted) throws Exception { PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig); ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory(); ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig( topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig); return getSplitsForTopic( - topicName.getPersistenceNamingEncoding(), + topicName, managedLedgerFactory, managedLedgerConfig, numSplits, @@ -253,38 +275,58 @@ Collection getSplitsNonPartitionedTopic(int numSplits, TopicName to schemaInfo, topicName.getLocalName(), tupleDomain, - offloadPolicies); + offloadPolicies, + readCompacted); } @VisibleForTesting - Collection getSplitsForTopic(String topicNamePersistenceEncoding, + Collection getSplitsForTopic(TopicName topicName, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, int numSplits, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, String tableName, TupleDomain tupleDomain, - OffloadPoliciesImpl offloadPolicies) - throws ManagedLedgerException, InterruptedException, IOException { + OffloadPoliciesImpl offloadPolicies, + boolean readCompacted) + throws ManagedLedgerException, InterruptedException, IOException, PulsarAdminException { ReadOnlyCursor readOnlyCursor = null; try { + Position cursorStartPos = PositionImpl.EARLIEST; + if (readCompacted) { + PersistentTopicInternalStats internalStats = pulsarAdmin.topics() + .getInternalStats(topicName.toString()); + if (internalStats.cursors.containsKey(COMPACTION_SUBSCRIPTION)) { + String[] compactedHorizonArr = + internalStats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition.split(":"); + cursorStartPos = PositionImpl.get( + Long.parseLong(compactedHorizonArr[0]), Long.parseLong(compactedHorizonArr[1])); + } + numSplits = 1; + } + + String topicNamePersistenceEncoding = topicName.getPersistenceNamingEncoding(); readOnlyCursor = managedLedgerFactory.openReadOnlyCursor( - topicNamePersistenceEncoding, - PositionImpl.EARLIEST, managedLedgerConfig); + topicName.getPersistenceNamingEncoding(), + cursorStartPos, managedLedgerConfig); long numEntries = readOnlyCursor.getNumberOfEntries(); - if (numEntries <= 0) { + if (numEntries <= 0 && !readCompacted) { return Collections.emptyList(); } - PredicatePushdownInfo predicatePushdownInfo = PredicatePushdownInfo.getPredicatePushdownInfo( - this.connectorId, - tupleDomain, - managedLedgerFactory, - managedLedgerConfig, - topicNamePersistenceEncoding, - numEntries); + PredicatePushdownInfo predicatePushdownInfo = null; + if (numEntries > 0) { + predicatePushdownInfo = PredicatePushdownInfo.getPredicatePushdownInfo( + this.connectorId, + tupleDomain, + managedLedgerFactory, + managedLedgerConfig, + topicNamePersistenceEncoding, + numEntries, + cursorStartPos); + } PositionImpl initialStartPosition; if (predicatePushdownInfo != null) { @@ -308,7 +350,9 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, for (int i = 0; i < numSplits; i++) { long entriesForSplit = (remainder > i) ? avgEntriesPerSplit + 1 : avgEntriesPerSplit; PositionImpl startPosition = (PositionImpl) readOnlyCursor.getReadPosition(); - readOnlyCursor.skipEntries(Math.toIntExact(entriesForSplit)); + if (entriesForSplit > 0) { + readOnlyCursor.skipEntries(Math.toIntExact(entriesForSplit)); + } PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition(); PulsarSplit pulsarSplit = new PulsarSplit(i, this.connectorId, @@ -324,7 +368,8 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, endPosition.getLedgerId(), tupleDomain, objectMapper.writeValueAsString(schemaInfo.getProperties()), - offloadPolicies); + offloadPolicies, + readCompacted); splits.add(pulsarSplit); } return splits; @@ -356,14 +401,15 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, String topicNamePersistenceEncoding, - long totalNumEntries) throws + long totalNumEntries, + Position startPos) throws ManagedLedgerException, InterruptedException { ReadOnlyCursor readOnlyCursor = null; try { readOnlyCursor = managedLedgerFactory.openReadOnlyCursor( topicNamePersistenceEncoding, - PositionImpl.EARLIEST, managedLedgerConfig); + startPos, managedLedgerConfig); if (tupleDomain.getDomains().isPresent()) { Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java index d8f7db96b83d7..90271f7870ab0 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Schema; @@ -48,6 +49,7 @@ public class PulsarSqlSchemaInfoProvider implements SchemaInfoProvider { private final TopicName topicName; + @Getter private final PulsarAdmin pulsarAdmin; private final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java index 716cd2cf9d663..a04b7396d3e12 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java @@ -114,7 +114,8 @@ public void cacheSizeAllocatorTest(long entryQueueSizeBytes) throws Exception { lastPosition.getLedgerId(), TupleDomain.all(), objectMapper.writeValueAsString(new HashMap<>()), - null); + null, + false); List pulsarColumnHandles = TestPulsarConnector.getColumnColumnHandles( topicName, Schema.BYTES.getSchemaInfo(), PulsarColumnHandle.HandleKeyValueType.NONE, true); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCompactedQuery.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCompactedQuery.java new file mode 100644 index 0000000000000..2d5a21a6e86cd --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCompactedQuery.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.airlift.slice.Slice; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.predicate.TupleDomain; +import io.trino.testing.TestingConnectorContext; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.schema.SchemaType; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class TestCompactedQuery extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + + // so that clients can test short names + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + static class Stock { + private String no; + private Double price; + } + + @DataProvider(name = "compactInfoProvider") + public Object[][] compactInfoProvider() { + return new Object[][]{ + {0, 0}, + {0, 100}, + {100, 0}, + {100, 100}, + }; + } + + @Test(dataProvider = "compactInfoProvider") + public void compactQueryForBatchMessages(int compactedMsgNum, int unCompactedMsgNum) throws Exception { + compactQuery(true, compactedMsgNum, unCompactedMsgNum); + } + + @Test(dataProvider = "compactInfoProvider") + public void compactQueryForNonBatchMessages(int compactedMsgNum, int unCompactedMsgNum) throws Exception { + compactQuery(false, compactedMsgNum, unCompactedMsgNum); + } + + private void compactQuery(boolean enableBatch, int compactedMsgNum, int unCompactedMsgNum) throws Exception { + TopicName topicName = TopicName.get( + TopicDomain.persistent.toString(), "public", "default", + RandomStringUtils.randomAlphabetic(5)); + + pulsarClient.newConsumer(Schema.AVRO(Stock.class)) + .topic(topicName.toString()) + .readCompacted(true) + .subscriptionName("sub") + .subscribe() + .close(); + + Producer producer = pulsarClient.newProducer(Schema.AVRO(Stock.class)) + .topic(topicName.toString()) + .enableBatching(enableBatch) + .batchingMaxMessages(5) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .create(); + + Map latestPrice = new HashMap<>(); + AtomicReference firstMessageId = new AtomicReference<>((MessageIdImpl) MessageId.earliest); + AtomicReference lastMessageId = new AtomicReference<>((MessageIdImpl) MessageId.latest); + int divisor = 8; + + prepareData(producer, compactedMsgNum, divisor, latestPrice, firstMessageId, lastMessageId); + + admin.topics().triggerCompaction(topicName.toString()); + Awaitility.await().until(() -> { + LongRunningProcessStatus status = admin.topics().compactionStatus(topicName.toString()); + return Objects.equals(LongRunningProcessStatus.Status.SUCCESS, status.status); + }); + + Set entrySet = + prepareData(producer, unCompactedMsgNum, divisor, latestPrice, firstMessageId, lastMessageId); + if (compactedMsgNum > 0 || unCompactedMsgNum > 0) { + assertEquals(divisor, latestPrice.size()); + } + log.info("finish to prepare compaction data"); + + ObjectMapper objectMapper = new ObjectMapper(); + PulsarSplit pulsarSplit = new PulsarSplit( + 0, + "connector-id", + topicName.getNamespace(), + topicName.getLocalName(), + topicName.getLocalName(), + entrySet.size(), + new String(Schema.AVRO(Stock.class).getSchemaInfo().getSchema()), + SchemaType.AVRO, + firstMessageId.get().getEntryId(), + lastMessageId.get().getEntryId(), + firstMessageId.get().getLedgerId(), + lastMessageId.get().getLedgerId(), + TupleDomain.all(), + objectMapper.writeValueAsString(new HashMap<>()), + null, + true); + + List pulsarColumnHandles = TestPulsarConnector.getColumnColumnHandles( + topicName, Schema.AVRO(Stock.class).getSchemaInfo(), PulsarColumnHandle.HandleKeyValueType.NONE, true); + + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + connectorConfig.setBrokerServiceUrl(admin.getServiceUrl()); + connectorConfig.setMetadataUrl("zk:localhost:2181"); + ConnectorContext prestoConnectorContext = new TestingConnectorContext(); + PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor( + pulsarColumnHandles, pulsarSplit, connectorConfig, pulsar.getManagedLedgerFactory(), + new ManagedLedgerConfig(), new PulsarConnectorMetricsTracker(new NullStatsProvider()), + new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()), + this.pulsarTestContext.getMockBookKeeper()); + + List columns = pulsarRecordCursor.getColumnHandles(); + while (pulsarRecordCursor.advanceNextPosition()) { + String name = null; + String key = null; + Double price = null; + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equalsIgnoreCase("no")) { + Slice slice = pulsarRecordCursor.getSlice(i); + name = slice == null ? null : new String(slice.getBytes()); + } else if (columns.get(i).getName().equalsIgnoreCase("price")) { + price = pulsarRecordCursor.getDouble(i); + } else if (columns.get(i).getName().equalsIgnoreCase(PulsarInternalColumn.KEY.getName())) { + Slice slice = pulsarRecordCursor.getSlice(i); + key = slice == null ? null : new String(slice.getBytes()); + } + } + assertNotNull(key); + assertNotNull(name); + assertNotNull(price); + assertTrue(latestPrice.containsKey(key)); + assertEquals(key, name); + assertEquals(price, latestPrice.remove(key)); + } + if (compactedMsgNum > 0 || unCompactedMsgNum > 0) { + assertEquals(latestPrice.size(), 1); + assertNotNull(latestPrice.remove("stock-7")); + } else { + assertEquals(latestPrice.size(), 0); + } + } + + private Set prepareData(Producer producer, int messageCount, int divisor, + Map latestPrice, + AtomicReference firstMessageId, + AtomicReference lastMessageId) throws PulsarClientException { + Set entrySet = new HashSet<>(); + AtomicInteger sendCount = new AtomicInteger(); + for (int i = 0; i < messageCount; i++) { + String name = "stock-" + i % divisor; + Double price = BigDecimal.valueOf( + RandomUtils.nextDouble(10, 100)).setScale(4, RoundingMode.HALF_UP).doubleValue(); + final int index = i; + final Stock stock = new Stock(name, price); + producer.newMessage().key(name).value(stock).sendAsync() + .thenAccept(messageId -> { + MessageIdImpl idImpl = (MessageIdImpl) messageId; + if (index == 0) { + firstMessageId.set(idImpl); + } + entrySet.add(idImpl.getLedgerId() + ":" + idImpl.getEntryId()); + latestPrice.put(name, price); + sendCount.incrementAndGet(); + }); + } + if (messageCount > 0) { + MessageIdImpl idImpl = (MessageIdImpl) producer.newMessage().key("stock-" + 7).send(); + entrySet.add(idImpl.getLedgerId() + ":" + idImpl.getEntryId() + 1); + lastMessageId.set(new MessageIdImpl( + idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex())); + } + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> sendCount.get() == messageCount); + return entrySet; + } + +} diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 61f6edd38530c..51591247c91b1 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -296,7 +296,7 @@ public static class Bar { 0, topicsToNumEntries.get(topicName.getSchemaName()), 0, 0, TupleDomain.all(), objectMapper.writeValueAsString( - topicsToSchemas.get(topicName.getSchemaName()).getProperties()), null)); + topicsToSchemas.get(topicName.getSchemaName()).getProperties()), null, null)); } } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java index 7eaa2da498f45..681bcccc3be1d 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java @@ -420,7 +420,7 @@ public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 0, entriesNum, 0, 0, TupleDomain.all(), objectMapper.writeValueAsString( - schema.getSchemaInfo().getProperties()), null); + schema.getSchemaInfo().getProperties()), null, null); PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor( ColumnHandles, split, diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index 86b2ee56c85fe..f0e3302cf24a1 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -96,7 +96,7 @@ public void testTopic(String delimiter) throws Exception { PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all()); final ResultCaptor> resultCaptor = new ResultCaptor<>(); - doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( @@ -104,7 +104,7 @@ public void testTopic(String delimiter) throws Exception { pulsarTableLayoutHandle, null); verify(this.pulsarSplitManager, times(1)) - .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); int totalSize = 0; for (PulsarSplit pulsarSplit : resultCaptor.getResult()) { @@ -143,13 +143,13 @@ public void testPartitionedTopic(String delimiter) throws Exception { PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all()); final ResultCaptor> resultCaptor = new ResultCaptor<>(); - doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); this.pulsarSplitManager.getSplits(mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), pulsarTableLayoutHandle, null); verify(this.pulsarSplitManager, times(1)) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); int partitions = partitionedTopicsToPartitions.get(topicName.toString()); @@ -210,7 +210,7 @@ public void testPublishTimePredicatePushdown(String delimiter) throws Exception final ResultCaptor> resultCaptor = new ResultCaptor<>(); doAnswer(resultCaptor).when(this.pulsarSplitManager) - .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), @@ -218,7 +218,7 @@ public void testPublishTimePredicatePushdown(String delimiter) throws Exception verify(this.pulsarSplitManager, times(1)) - .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); int totalSize = 0; int initalStart = 1; @@ -267,7 +267,7 @@ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) t final ResultCaptor> resultCaptor = new ResultCaptor<>(); doAnswer(resultCaptor).when(this.pulsarSplitManager) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), @@ -275,7 +275,7 @@ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) t verify(this.pulsarSplitManager, times(1)) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any(), any()); int partitions = partitionedTopicsToPartitions.get(topicName.toString()); @@ -324,7 +324,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); TupleDomain tupleDomain = TupleDomain.withColumnDomains(domainMap); Collection splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null, false); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 2); } @@ -341,7 +341,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(1, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null, false); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 2); } @@ -357,7 +357,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null, false); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 3); } @@ -376,7 +376,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null, false); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 4); } @@ -388,7 +388,6 @@ public void testPartitionFilter(String delimiter) throws Exception { } } - } @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true) @@ -454,7 +453,7 @@ public void pulsarSplitJsonCodecTest() throws JsonProcessingException, Unsupport PulsarSplit pulsarSplit = new PulsarSplit( splitId, connectorId, schemaName, originSchemaName, tableName, splitSize, schema, schemaType, startPositionEntryId, endPositionEntryId, startPositionLedgerId, - endPositionLedgerId, tupleDomain, schemaInfoProperties, offloadPolicies); + endPositionLedgerId, tupleDomain, schemaInfoProperties, offloadPolicies, null); pulsarSplitData = jsonCodec.toJsonBytes(pulsarSplit); } catch (Exception e) { e.printStackTrace(); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java index 1e959e9b83059..8f857563e36f3 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java @@ -107,7 +107,7 @@ public void queryTest() throws Exception { connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress()); PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new PulsarConnectorId("1"), connectorConfig); Collection splits = pulsarSplitManager.getSplitsForTopic( - topicName.getPersistenceNamingEncoding(), + topicName, pulsar.getManagedLedgerFactory(), new ManagedLedgerConfig(), 3, @@ -115,7 +115,8 @@ public void queryTest() throws Exception { schemaInfo, topic, TupleDomain.all(), - null); + null, + false); List columnHandleList = TestPulsarConnector.getColumnColumnHandles( topicName, schemaInfo, PulsarColumnHandle.HandleKeyValueType.NONE, true); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 7658883441f5b..d3bb94ba88903 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -19,13 +19,25 @@ package org.apache.pulsar.tests.integration.presto; import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -34,12 +46,14 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -359,8 +373,112 @@ public void testQueueBigEntry() throws Exception { producer.newMessage().value(data).send(); } - int count = selectCount("public/default", tableName); + int count = selectCount("public/default", tableName, null); Assert.assertEquals(count, messageCnt); } + @DataProvider(name = "compactedQueryProvider") + public Object[][] compactedQueryProvider() { + return new Object[][] { + {0, 0}, + {0, 100}, + {100, 0}, + {100, 100} + }; + } + + @Test(timeOut = 1000 * 30, dataProvider = "compactedQueryProvider") + public void testCompactedQueryForNoBatchData(int compactedCount, int noCompactedCount) throws Exception { + testCompactedQuery(false, compactedCount, noCompactedCount); + } + + @Test(timeOut = 1000 * 30, dataProvider = "compactedQueryProvider") + public void testCompactedQueryForBatchData(int compactedCount, int noCompactedCount) throws Exception { + testCompactedQuery(true, compactedCount, noCompactedCount); + } + + private void testCompactedQuery(boolean enableBatch, int compactedCount, int noCompactedCount) throws Exception { + String tableName = "compacted-topic-" + randomName(5); + String topic = "public/default/" + tableName; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.AVRO(Stock.class)) + .topic(topic) + .enableBatching(enableBatch) + .batchingMaxMessages(10) + .create(); + + int divisor = 8; + String removeKey = "4"; + Map latestStocks = new HashMap<>(); + prepareDataForCompactedQuery(producer, latestStocks, compactedCount, divisor, removeKey); + if (compactedCount > 0) { + pulsarAdmin.topics().triggerCompaction(topic); + Awaitility.await().until(() -> { + LongRunningProcessStatus status = pulsarAdmin.topics().compactionStatus(topic); + return Objects.equals(LongRunningProcessStatus.Status.SUCCESS, status.status); + }); + } + prepareDataForCompactedQuery(producer, latestStocks, noCompactedCount, divisor, removeKey); + assertEquals(latestStocks.size(), (compactedCount > 0 || noCompactedCount > 0 ? divisor : 0)); + + PersistentTopicInternalStats internalStats = pulsarAdmin.topics().getInternalStats(topic); + log.info("check internal stats for topic {}", topic); + log.info(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(internalStats)); + + Awaitility.await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> { + int count = selectCount("public/default", tableName, "__compacted_query__=true"); + log.info("select count result for table {} is {}.", tableName, count); + return count == (compactedCount > 0 || noCompactedCount > 0 ? divisor - 1 : 0); + }); + ContainerExecResult result = execQuery( + "select __key__,symbol,sharePrice from pulsar.\"public/default\".\"" + tableName + + "\" where __compacted_query__=true"); + assertThat(result.getExitCode()).isEqualTo(0); + log.info("select sql query output \n{}", result.getStdout()); + if (compactedCount == 0 && noCompactedCount == 0) { + assertTrue(latestStocks.isEmpty()); + assertTrue(result.getStdout().trim().isEmpty()); + return; + } + String[] dataArray = result.getStdout().split("\n"); + for (String data : dataArray) { + String[] columns = data.split(","); + Stock expectedStock = latestStocks.remove(columns[0].replace("\"", "")); + assertNotNull(expectedStock); + assertEquals(columns[1].replace("\"", ""), expectedStock.getSymbol()); + assertEquals(columns[2].replace("\"", ""), "" + expectedStock.getSharePrice()); + } + assertEquals(1, latestStocks.size()); + assertNotNull(latestStocks.remove(removeKey)); + } + + private void prepareDataForCompactedQuery(Producer producer, Map latestStocks, + int messageCount, int divisor, String removeKey) + throws PulsarClientException { + + AtomicInteger sendCount = new AtomicInteger(); + for (int i = 0; i < messageCount; i++) { + int entryId = i % divisor; + String key = "" + entryId; + Stock stock = new Stock(entryId, "stock-" + entryId, RandomUtils.nextDouble(50, 90)); + CompletableFuture future = producer.newMessage().key(key).value(stock).sendAsync(); + future.thenApply(messageId -> { + latestStocks.put(key, stock); + sendCount.incrementAndGet(); + return null; + }); + } + if (messageCount > 0) { + producer.newMessage().key(removeKey).send(); + } + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> sendCount.get() == messageCount); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java index 2833be67bc8e3..862aad3b1e515 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java @@ -264,7 +264,7 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); assertThat(returnedTimestamps.size()).isEqualTo(0); - int count = selectCount(namespace, topic); + int count = selectCount(namespace, topic, null); assertThat(count).isGreaterThan(messageNum - 2); } @@ -314,8 +314,13 @@ private static void printCurrent(ResultSet rs) throws SQLException { } - protected int selectCount(String namespace, String tableName) throws SQLException { - String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName); + protected int selectCount(String namespace, String tableName, String condition) throws SQLException { + if (condition != null) { + condition = " where " + condition; + } else { + condition = ""; + } + String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"%s", namespace, tableName, condition); log.info("Executing count query: {}", query); ResultSet res = connection.createStatement().executeQuery(query); res.next();