Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
}

static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
public static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {

private final BookKeeper bkClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
-Dpresto-temporarily-allow-java8=true
-Djdk.attach.allowAttachSelf=true
-Djdk.attach.allowAttachSelf=true
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
Expand All @@ -35,6 +36,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
Expand Down Expand Up @@ -68,14 +70,17 @@ public class PulsarConnectorCache {
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();

@Getter
private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperFactory;

private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";


private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getMetadataUrl(),
MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
this.bookKeeperFactory = initBookKeeperFactory(pulsarConnectorConfig);
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
StatsProvider.class, getClass().getClassLoader());
Expand Down Expand Up @@ -108,27 +113,32 @@ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsa
return instance;
}

private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
private BookkeeperFactoryForCustomEnsemblePlacementPolicy initBookKeeperFactory(
PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
PulsarMetadataClientDriver.init();

ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setMetadataServiceUri("metadata-store:" + pulsarConnectorConfig.getMetadataUrl())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
.setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
.setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
.setMetadataServiceUri("metadata-store:" + pulsarConnectorConfig.getMetadataUrl())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
.setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
.setNettyMaxFrameSizeBytes(
pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
return new ManagedLedgerFactoryImpl.DefaultBkFactory(bkClientConfiguration);
}

private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(
pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
return new ManagedLedgerFactoryImpl(metadataStore, bkClientConfiguration, managedLedgerFactoryConfig);
return new ManagedLedgerFactoryImpl(metadataStore, bookKeeperFactory, managedLedgerFactoryConfig);
}

public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPoliciesImpl offloadPolicies,
Expand Down Expand Up @@ -206,6 +216,7 @@ public static void shutdown() throws Exception {
if (instance != null) {
instance.statsProvider.stop();
instance.managedLedgerFactory.shutdown();
instance.bookKeeperFactory.get().close();
instance.metadataStore.close();
instance.offloaderScheduler.shutdown();
instance.offloadersCache.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -62,8 +63,11 @@ public class PulsarInternalColumn {
public static final PulsarInternalColumn PROPERTIES = new PulsarInternalColumn("__properties__",
VarcharType.VARCHAR, "User defined properties");

public static final PulsarInternalColumn COMPACTED_QUERY = new PulsarInternalColumn("__compacted_query__",
BooleanType.BOOLEAN, "Compacted query flag");

private static Set<PulsarInternalColumn> internalFields = ImmutableSet.of(PARTITION, EVENT_TIME, PUBLISH_TIME,
MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES);
MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES, COMPACTED_QUERY);


private final String name;
Expand Down
Loading