From 769d8a3384fff7c5b7745e910be8e4e645d89c6d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 2 Feb 2026 09:18:33 -0600 Subject: [PATCH] Add agent-hcd module for HCD support --- README.md | 10 +- agent-distribution/build.gradle | 3 + agent-hcd/build.gradle | 102 ++++ agent-hcd/gradle.properties | 2 + .../com/datastax/oss/cdc/agent/Agent.java | 95 ++++ .../datastax/oss/cdc/agent/CdcMetrics.java | 54 ++ .../cdc/agent/CommitLogReadHandlerImpl.java | 490 ++++++++++++++++++ .../cdc/agent/CommitLogReaderServiceImpl.java | 91 ++++ .../com/datastax/oss/cdc/agent/Mutation.java | 68 +++ .../datastax/oss/cdc/agent/MutationMaker.java | 39 ++ .../oss/cdc/agent/PulsarMutationSender.java | 162 ++++++ gradle.properties | 3 + settings.gradle | 3 + 13 files changed, 1120 insertions(+), 2 deletions(-) create mode 100644 agent-hcd/build.gradle create mode 100644 agent-hcd/gradle.properties create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Agent.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CdcMetrics.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReadHandlerImpl.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Mutation.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/MutationMaker.java create mode 100644 agent-hcd/src/main/java/com/datastax/oss/cdc/agent/PulsarMutationSender.java diff --git a/README.md b/README.md index e8e2c895..faafeca0 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,9 @@ Supported Cassandra version: * Cassandra 3.11+ * Cassandra 4.0+ * [DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise) 6.8.16+ +* [DataStax Hyper-Converged Database (HCD)](https://docs.datastax.com/en/hyper-converged-database/1.2/get-started/hcd-introduction.html) 1.2.3+ -Note: Only Cassandra 4.0 and DSE 6.8.16+ support the near realtime CDC allowing to replicate data as soon as they are synced on disk. +Note: Only Cassandra 4.0, DSE 6.8.16+, and HCD 1.2.3+ support the near realtime CDC allowing to replicate data as soon as they are synced on disk. ## Documentation @@ -83,10 +84,15 @@ Cassandra supported CQL3 data types (with the associated AVRO type or logical-ty ./gradlew assemble -Note: Artifacts for DSE agent are excluded by default. To build the `agent-dse4` module, specify the `dse4` property: +Note: Artifacts for HCD and DSE agents are excluded by default. To build these modules specify the appropriate properties: +the `agent-dse4` module, specify the `dse4` property: + # agent-dse4 ./gradlew assemble -Pdse4 + # agent-hcd + ./gradlew assemble -Phcd + ## Acknowledgments Apache Cassandra, Apache Pulsar, Cassandra and Pulsar are trademarks of the Apache Software Foundation. diff --git a/agent-distribution/build.gradle b/agent-distribution/build.gradle index af7cf82d..c3b1dc32 100644 --- a/agent-distribution/build.gradle +++ b/agent-distribution/build.gradle @@ -12,6 +12,9 @@ dependencies { if (project.hasProperty("dse4")) { release project(path: ':agent-dse4', configuration: 'shadow') } + if (project.hasProperty("hcd")) { + release project(path: ':agent-hcd', configuration: 'shadow') + } } distributions { diff --git a/agent-hcd/build.gradle b/agent-hcd/build.gradle new file mode 100644 index 00000000..c1680e56 --- /dev/null +++ b/agent-hcd/build.gradle @@ -0,0 +1,102 @@ +plugins { + id 'java-library' + id 'java' + id 'application' + id "com.github.johnrengelman.shadow" +} + +application { + mainClass = "$mainClassName" +} + +jar { + duplicatesStrategy = DuplicatesStrategy.INCLUDE + manifest { + attributes 'Premain-Class': "$mainClassName" + attributes 'Implementation-Version': project.version + } + zip64=true +} + +tasks.jar.dependsOn project(':commons').jar +tasks.jar.dependsOn project(':agent').jar + +compileJava { + // Remove -Werror to avoid issues with guava annotation warnings + options.compilerArgs = [ '-parameters', '-Xlint:all', '-Xlint:-processing', '-Xlint:-serial'] +} + +compileTestJava { + options.compilerArgs += '-parameters' +} +sourceSets { + // Make the compileOnly dependencies available when compiling/running tests + test.compileClasspath += configurations.compileClasspath + test.runtimeClasspath += configurations.compileClasspath +} + + +configurations { + custom +} + +dependencies { + custom project(':commons') + custom project(':agent') + + implementation project(':commons') + implementation project(':agent') + + implementation("org.apache.avro:avro:${avroVersion}") + implementation("${pulsarGroup}:pulsar-client:${pulsarVersion}") + + // Use local HCD dse-db JAR file + compileOnly files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar") + compileOnly files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar") + compileOnly files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar") + + testCompileOnly files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar") + testCompileOnly files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar") + testCompileOnly files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar") + + testImplementation files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar") + testImplementation files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar") + testImplementation files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar") + + testImplementation "org.apache.cassandra:java-driver-core:${ossDriverVersion}" + testImplementation "org.apache.cassandra:java-driver-query-builder:${ossDriverVersion}" + + testImplementation "org.testcontainers:testcontainers:${testContainersVersion}" + testImplementation project(':testcontainers') + + testRuntimeOnly "org.slf4j:slf4j-api:${slf4jVersion}" + testRuntimeOnly "ch.qos.logback:logback-classic:${logbackVersion}" +} + +test { + // Add dependency on jar task, since it will be main target for testing + dependsOn shadowJar + + // Rearrange test classpath, add compiled JAR instead of main classes + classpath = project.sourceSets.test.output + configurations.testRuntimeClasspath + files(shadowJar.archiveFile) + + useJUnitPlatform() + + environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag + environment 'CASSANDRA_IMAGE', 'datastax/hcd:' + hcdVersion + + systemProperty "buildDir", buildDir + systemProperty "projectVersion", project.version +} + +shadowJar { + manifest { + inheritFrom project.tasks.jar.manifest + } + configurations = [project.configurations.custom] + // relocate AVRO because dse-db depends on avro 1.7.7 + relocate 'org.apache.avro', 'com.datastax.oss.cdc.avro' +} + +jar.enabled = true +assemble.dependsOn(shadowJar) diff --git a/agent-hcd/gradle.properties b/agent-hcd/gradle.properties new file mode 100644 index 00000000..b00dc2e1 --- /dev/null +++ b/agent-hcd/gradle.properties @@ -0,0 +1,2 @@ +artifact=agent-hcd +mainClassName=com.datastax.oss.cdc.agent.Agent diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Agent.java new file mode 100644 index 00000000..c88ac9f4 --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -0,0 +1,95 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import lombok.extern.slf4j.Slf4j; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.StorageService; + +import java.lang.instrument.Instrumentation; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class Agent { + public static void premain(String agentArgs, Instrumentation inst) { + log.info("[Agent] In premain method"); + try { + main(agentArgs, inst); + } catch(Exception e) { + log.error("error:", e); + System.exit(-1); + } + } + + public static void agentmain(String agentArgs, Instrumentation inst) { + log.info("[Agent] In agentmain method"); + try { + main(agentArgs, inst); + } catch(Exception e) { + log.error("error:", e); + System.exit(-1); + } + } + + static void main(String agentArgs, Instrumentation inst) throws Exception { + DatabaseDescriptor.daemonInitialization(); + if (DatabaseDescriptor.isCDCEnabled() == false) { + log.error("cdc_enabled=false in your cassandra configuration, CDC agent not started."); + } else if (DatabaseDescriptor.getCDCLogLocation() == null) { + log.error("cdc_raw_directory=null in your cassandra configuration, CDC agent not started."); + } else { + startCdcAgent(agentArgs); + } + } + + static void startCdcAgent(String agentArgs) throws Exception { + String agentVersion = Agent.class.getPackage().getImplementationVersion(); + log.info("Starting CDC agent v{}, cdc_raw_directory={}", agentVersion, DatabaseDescriptor.getCDCLogLocation()); + AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs); + + SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir); + segmentOffsetFileWriter.loadOffsets(); + + PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); + CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation().path(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true); + + commitLogReaderService.initialize(); + + // detect commitlogs file and submit new/modified files to the commitLogReader + ExecutorService commitLogExecutor = Executors.newSingleThreadExecutor(); + commitLogExecutor.submit(() -> { + try { + do { + // wait to initialize the hostID before starting + Thread.sleep(1000); + } while(StorageService.instance.getLocalHostUUID() == null); + + commitLogProcessor.initialize(); + commitLogProcessor.start(); + } catch(Exception e) { + log.error("commitLogProcessor error:", e); + } + }); + + ExecutorService commitLogServiceExecutor = Executors.newSingleThreadExecutor(); + commitLogServiceExecutor.submit(commitLogReaderService); + + log.info("CDC agent started"); + } +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CdcMetrics.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CdcMetrics.java new file mode 100644 index 00000000..9305ff2f --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CdcMetrics.java @@ -0,0 +1,54 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.metrics.MetricNameFactory; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class CdcMetrics { + public static final String CDC_AGENT_MBEAN_NAME = "CdcAgent"; + private static final MetricNameFactory factory = new DefaultNameFactory(CDC_AGENT_MBEAN_NAME); + + public static final Counter sentMutations = Metrics.counter(factory.createMetricName("SentMutations")); + public static final Counter sentErrors = Metrics.counter(factory.createMetricName("SentErrors")); + + public static final Counter commitLogReadErrors = Metrics.counter(factory.createMetricName("CommitLogReadErrors")); + public static final Counter skippedMutations = Metrics.counter(factory.createMetricName("SkippedMutations")); + + public static final Counter executedTasks = Metrics.counter(factory.createMetricName("ExecutedTasks")); + + public static final Gauge submittedTasksGauge = Metrics.register(factory.createMetricName("SubmittedTasks"), + CommitLogReaderService.submittedTasks::size); + + public static final Gauge maxSubmittedTasks = Metrics.register(factory.createMetricName("MaxSubmittedTasks"), + CommitLogReaderService.maxSubmittedTasks::get); + + public static final Gauge pendingTasksGauge = Metrics.register(factory.createMetricName("PendingTasks"), + CommitLogReaderService.pendingTasks::size); + + public static final Gauge maxPendingTasks = Metrics.register(factory.createMetricName("MaxPendingTasks"), + CommitLogReaderService.maxPendingTasks::get); + + public static final Gauge uncleanedTasksGauge = Metrics.register(factory.createMetricName("UncleanedTasks"), + CommitLogReaderService.pendingTasks::size); + + public static final Gauge maxUncleanedTasks = Metrics.register(factory.createMetricName("MaxUncleanedTasks"), + CommitLogReaderService.maxUncleanedTasks::get); +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReadHandlerImpl.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReadHandlerImpl.java new file mode 100644 index 00000000..2cbe1b10 --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReadHandlerImpl.java @@ -0,0 +1,490 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import com.datastax.oss.cdc.agent.exceptions.CassandraConnectorSchemaException; +import com.datastax.oss.cdc.agent.exceptions.CassandraConnectorTaskException; +import lombok.extern.slf4j.Slf4j; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.codec.digest.DigestUtils; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import static com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.RowType.DELETE; + +/** + * Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code. + * + * This handler implementation processes each {@link org.apache.cassandra.db.Mutation} and invokes one of the registered partition handler + * for each {@link PartitionUpdate} in the {@link org.apache.cassandra.db.Mutation} (a mutation could have multiple partitions if it is a batch update), + * which in turn makes one or more record via the {@link AbstractMutationMaker}. + */ +@Slf4j +public class CommitLogReadHandlerImpl implements CommitLogReadHandler { + + private final AbstractMutationMaker mutationMaker; + private final MutationSender mutationSender; + private final CommitLogReaderService.Task task; + private int processedPosition; + + CommitLogReadHandlerImpl(MutationSender mutationSender, + CommitLogReaderService.Task task, + int currentPosition) { + this.mutationSender = mutationSender; + this.mutationMaker = new MutationMaker(); + this.task = task; + this.processedPosition = currentPosition; + } + + public int getProcessedPosition() { + return this.processedPosition; + } + + /** + * A PartitionType represents the type of a PartitionUpdate. + */ + enum PartitionType { + /** + * a partition-level deletion where partition key = primary key (no clustering key) + */ + PARTITION_KEY_ROW_DELETION, + + /** + * a partition-level deletion where partition key + clustering key = primary key + */ + PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, + + /** + * a row-level modification + */ + ROW_LEVEL_MODIFICATION, + + /** + * an update on materialized view + */ + MATERIALIZED_VIEW, + + /** + * an update on secondary index + */ + SECONDARY_INDEX, + + /** + * an update on a table that contains counter data type + */ + COUNTER, + + /** + * a partition-level modification + */ + PARTITION_LEVEL_MODIFICATION; + + static final Set supportedPartitionTypes = new HashSet<>(Arrays.asList( + PARTITION_KEY_ROW_DELETION, + PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, + ROW_LEVEL_MODIFICATION, + PARTITION_LEVEL_MODIFICATION)); + + public static PartitionType getPartitionType(PartitionUpdate pu) { + if (pu.metadata().isCounter()) { + return COUNTER; + } + else if (pu.metadata().isView()) { + return MATERIALIZED_VIEW; + } + else if (pu.metadata().isIndex()) { + return SECONDARY_INDEX; + } + else if (isPartitionDeletion(pu) && hasClusteringKeys(pu)) { + return PARTITION_AND_CLUSTERING_KEY_ROW_DELETION; + } + else if (isPartitionDeletion(pu) && !hasClusteringKeys(pu)) { + return PARTITION_KEY_ROW_DELETION; + } + else if (!pu.unfilteredIterator().hasNext()) { + return PARTITION_LEVEL_MODIFICATION; + } + else { + return ROW_LEVEL_MODIFICATION; + } + } + + public static boolean isValid(PartitionType type) { + return supportedPartitionTypes.contains(type); + } + + public static boolean hasClusteringKeys(PartitionUpdate pu) { + return !pu.metadata().clusteringColumns().isEmpty(); + } + + public static boolean isPartitionDeletion(PartitionUpdate pu) { + return pu.partitionLevelDeletion().markedForDeleteAt() > LivenessInfo.NO_TIMESTAMP; + } + } + + /** + * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. + */ + enum RowType { + /** + * Single-row insert + */ + INSERT, + + /** + * Single-row update + */ + UPDATE, + + /** + * Single-row delete + */ + DELETE, + + /** + * A row-level deletion that deletes a range of keys. + * For example: DELETE * FROM table WHERE partition_key = 1 AND clustering_key > 0; + */ + RANGE_TOMBSTONE, + + /** + * Unknown row-level operation + */ + UNKNOWN; + + static final Set supportedRowTypes = new HashSet<>(Arrays.asList(INSERT, UPDATE, DELETE)); + + public static RowType getRowType(Unfiltered unfiltered) { + if (unfiltered.isRangeTombstoneMarker()) { + return RANGE_TOMBSTONE; + } + else if (unfiltered.isRow()) { + Row row = (Row) unfiltered; + return getRowType(row); + } + return UNKNOWN; + } + + public static RowType getRowType(Row row) { + if (isDelete(row)) { + return DELETE; + } + else if (isInsert(row)) { + return INSERT; + } + else if (isUpdate(row)) { + return UPDATE; + } + return UNKNOWN; + } + + public static boolean isValid(RowType rowType) { + return supportedRowTypes.contains(rowType); + } + + public static boolean isDelete(Row row) { + return row.deletion().time().markedForDeleteAt() > LivenessInfo.NO_TIMESTAMP; + } + + public static boolean isInsert(Row row) { + return row.primaryKeyLivenessInfo().timestamp() > LivenessInfo.NO_TIMESTAMP; + } + + public static boolean isUpdate(Row row) { + return row.primaryKeyLivenessInfo().timestamp() == LivenessInfo.NO_TIMESTAMP; + } + } + + @Override + public void handleInvalidMutation(org.apache.cassandra.schema.TableId tableId) { + log.warn("Invalid mutation for table {}", tableId); + } + + @Override + public void handleMutation(org.apache.cassandra.db.Mutation mutation, int size, int entryLocation, CommitLogDescriptor descriptor) { + if (!mutation.trackedByCDC()) { + return; + } + + for (PartitionUpdate pu : mutation.getPartitionUpdates()) { + try { + DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(); + org.apache.cassandra.db.Mutation.serializer.serialize(mutation, dataOutputBuffer, descriptor.getMessagingVersion()); + String md5Digest = DigestUtils.md5Hex(dataOutputBuffer.getData()); + process(pu, descriptor.id, entryLocation, md5Digest); + } + catch (Exception e) { + throw new RuntimeException(String.format("Failed to process PartitionUpdate %s at %d:%d for table %s.%s.", + pu.toString(), descriptor.id, entryLocation, pu.metadata().keyspace, pu.metadata().name), e); + } + } + } + + @Override + public void handleUnrecoverableError(CommitLogReadException exception) { + log.error("Unrecoverable error when reading commit log", exception); + CdcMetrics.commitLogReadErrors.inc(); + } + + @Override + public boolean shouldSkipSegmentOnError(CommitLogReadException exception) { + if (exception.permissible) { + log.error("Encountered a permissible exception during log replay", exception); + } + else { + log.error("Encountered a non-permissible exception during log replay", exception); + } + return false; + } + + /** + * Method which processes a partition update if it's valid (either a single-row partition-level + * deletion or a row-level modification) or throw an exception if it isn't. The valid partition + * update is then converted into a {@link AbstractMutation}. + */ + private void process(PartitionUpdate pu, long segment, int position, String md5Digest) { + PartitionType partitionType = PartitionType.getPartitionType(pu); + + if (!PartitionType.isValid(partitionType)) { + log.warn("Encountered an unsupported partition type {}, skipping...", partitionType); + return; + } + + switch (partitionType) { + case PARTITION_AND_CLUSTERING_KEY_ROW_DELETION: + case PARTITION_KEY_ROW_DELETION: { + handlePartitionDeletion(pu, segment, position, md5Digest); + } + break; + + case PARTITION_LEVEL_MODIFICATION: { + UnfilteredRowIterator it = pu.unfilteredIterator(); + Row row = it.staticRow(); + RowType rowType = RowType.getRowType(row); + handleRowModifications(row, rowType, pu, segment, position, md5Digest); + } + break; + + case ROW_LEVEL_MODIFICATION: { + UnfilteredRowIterator it = pu.unfilteredIterator(); + while (it.hasNext()) { + Unfiltered rowOrRangeTombstone = it.next(); + RowType rowType = RowType.getRowType(rowOrRangeTombstone); + if (!RowType.isValid(rowType)) { + log.warn("Encountered an unsupported row type {}, skipping...", rowType); + continue; + } + Row row = (Row) rowOrRangeTombstone; + + handleRowModifications(row, rowType, pu, segment, position, md5Digest); + } + } + break; + + default: + throw new CassandraConnectorSchemaException("Unsupported partition type " + partitionType + " should have been skipped"); + } + } + + /** + * Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation + * of this event into a {@link AbstractMutation} object and send it to pulsar. A valid deletion + * event means a partition only has a single row, this implies there are no clustering keys. + */ + private void handlePartitionDeletion(PartitionUpdate pu, long segment, int position, String md5Digest) { + try { + Object[] after = new Object[pu.metadata().partitionKeyColumns().size() + pu.metadata().clusteringColumns().size()]; + populatePartitionColumns(after, pu); + mutationMaker.delete(StorageService.instance.getLocalHostUUID(), segment, position, + pu.maxTimestamp(), after, this::sendAsync, md5Digest, pu.metadata(), pu.partitionKey().getToken().getTokenValue()); + } + catch (Exception e) { + log.error("Fail to send delete partition at {}:{}. Reason: {}", segment, position, e); + } + } + + /** + * Handle a valid event resulted from a row-level modification by converting Cassandra representation of + * this event into a {@link AbstractMutation} object and sent it to pulsar. A valid event + * implies this must be an insert, update, or delete. + */ + private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, + long segment, int position, String md5Digest) { + Object[] after = new Object[pu.metadata().partitionKeyColumns().size() + pu.metadata().clusteringColumns().size()]; + populatePartitionColumns(after, pu); + populateClusteringColumns(after, row, pu); + + long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); + switch (rowType) { + case INSERT: + mutationMaker.insert(StorageService.instance.getLocalHostUUID(), segment, position, + ts, after, this::sendAsync, md5Digest, pu.metadata(), pu.partitionKey().getToken().getTokenValue()); + break; + + case UPDATE: + mutationMaker.update(StorageService.instance.getLocalHostUUID(), segment, position, + ts, after, this::sendAsync, md5Digest, pu.metadata(), pu.partitionKey().getToken().getTokenValue()); + break; + + case DELETE: + mutationMaker.delete(StorageService.instance.getLocalHostUUID(), segment, position, + ts, after, this::sendAsync, md5Digest, pu.metadata(), pu.partitionKey().getToken().getTokenValue()); + break; + + default: + throw new CassandraConnectorTaskException("Unsupported row type " + rowType + " should have been skipped"); + } + } + + private void populatePartitionColumns(Object[] after, PartitionUpdate pu) { + List partitionKeys = getPartitionKeys(pu); + int i = 0; + for (ColumnMetadata cd : pu.metadata().partitionKeyColumns()) { + try { + after[i++] = partitionKeys.get(cd.position()); + } + catch (Exception e) { + throw new RuntimeException(String.format("Failed to populate Column %s with Type %s of Table %s in KeySpace %s.", + cd.name.toString(), cd.type.toString(), cd.cfName, cd.ksName), e); + } + } + } + + @SuppressWarnings({"unchecked","rawtypes"}) + private void populateClusteringColumns(Object[] after, Row row, PartitionUpdate pu) { + int i = pu.metadata().partitionKeyColumns().size(); + for (ColumnMetadata cd : pu.metadata().clusteringColumns().stream().limit(row.clustering().size()).collect(Collectors.toList())) { + try { + ValueAccessor valueAccessor = row.clustering().accessor(); + after[i++] = cd.type.compose(valueAccessor.toBuffer(row.clustering().get(cd.position()))); + } + catch (Exception e) { + throw new RuntimeException(String.format("Failed to populate Column %s with Type %s of Table %s in KeySpace %s.", + cd.name.toString(), cd.type.toString(), cd.cfName, cd.ksName), e); + } + } + } + + /** + * Given a PartitionUpdate, deserialize the partition key byte buffer + * into a list of partition key values. + */ + @SuppressWarnings("checkstyle:magicnumber") + private static List getPartitionKeys(PartitionUpdate pu) { + List values = new ArrayList<>(pu.metadata().partitionKeyColumns().size()); + List columnDefinitions = pu.metadata().partitionKeyColumns(); + + // simple partition key + if (columnDefinitions.size() == 1) { + ByteBuffer bb = pu.partitionKey().getKey(); + ColumnSpecification cs = columnDefinitions.get(0); + AbstractType type = cs.type; + try { + Object value = type.compose(bb); + values.add(value); + } + catch (Exception e) { + throw new RuntimeException(String.format("Failed to deserialize Column %s with Type %s in Table %s and KeySpace %s.", + cs.name.toString(), cs.type.toString(), cs.cfName, cs.ksName), e); + } + } + else { + ByteBuffer keyBytes = pu.partitionKey().getKey().duplicate(); + + // 0xFFFF is reserved to encode "static column", skip if it exists at the start + if (keyBytes.remaining() >= 2) { + int header = ByteBufferUtil.getShortLength(keyBytes, keyBytes.position()); + if ((header & 0xFFFF) == 0xFFFF) { + ByteBufferUtil.readShortLength(keyBytes); + } + } + + // the encoding of columns in the partition key byte buffer is + // ... + // where is: + // + // is a 2 bytes unsigned short (excluding 0xFFFF used to encode "static columns") + // should always be 0 for columns (1 for query bounds) + // this section reads the bytes for each column and deserialize into objects based on each column type + int i = 0; + while (keyBytes.remaining() > 0 && i < columnDefinitions.size()) { + ColumnSpecification cs = columnDefinitions.get(i); + AbstractType type = cs.type; + ByteBuffer bb = ByteBufferUtil.readBytesWithShortLength(keyBytes); + try { + Object value = type.compose(bb); + values.add(value); + } + catch (Exception e) { + throw new RuntimeException(String.format("Failed to deserialize Column %s with Type %s in Table %s and KeySpace %s", + cs.name.toString(), cs.type.toString(), cs.cfName, cs.ksName), e); + } + byte b = keyBytes.get(); + if (b != 0) { + break; + } + ++i; + } + } + + return values; + } + + public void sendAsync(Mutation mutation) { + log.debug("Sending mutation={}", mutation); + try { + task.inflightMessagesSemaphore.acquireUninterruptibly(); // may block + this.mutationSender.sendMutationAsync(mutation) + .handle((msgId, t)-> { + if (t == null) { + CdcMetrics.sentMutations.inc(); + log.debug("Sent mutation={}", mutation); + } else { + if (t instanceof CassandraConnectorSchemaException) { + log.error("Invalid primary key schema:", t); + CdcMetrics.skippedMutations.inc(); + } else { + CdcMetrics.sentErrors.inc(); + log.debug("Sent failed mutation=" + mutation, t); + task.lastException = t; + } + } + task.inflightMessagesSemaphore.release(); + return msgId; + }); + this.processedPosition = Math.max(this.processedPosition, mutation.getPosition()); + } catch(Exception e) { + log.error("Send failed:", e); + CdcMetrics.sentErrors.inc(); + } + } +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java new file mode 100644 index 00000000..d8c30c1b --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -0,0 +1,91 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import lombok.extern.slf4j.Slf4j; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.commitlog.CommitLogReader; +import org.apache.cassandra.schema.TableMetadata; + +import java.io.File; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.function.IntBinaryOperator; + +/** + * Consume a queue of commitlog files to read mutations. + */ +@Slf4j +public class CommitLogReaderServiceImpl extends CommitLogReaderService { + + public CommitLogReaderServiceImpl(AgentConfig config, + MutationSender mutationSender, + SegmentOffsetWriter segmentOffsetWriter, + CommitLogTransfer commitLogTransfer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + this.tasksExecutor = new JMXEnabledThreadPoolExecutor( + config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, + config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, + 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("CdcCommitlogProcessor"), + CdcMetrics.CDC_AGENT_MBEAN_NAME); + } + + @SuppressWarnings("unchecked") + public Task createTask(String filename, long segment, int syncPosition, boolean completed) { + return new Task(filename, segment, syncPosition, completed) { + + public void run() { + log.debug("Starting task={} lasSentPosition={}", this, segmentOffsetWriter.position(Optional.empty(), segment)); + File file = getFile(); + try { + int lastSentPosition = -1; + if (!file.exists()) { + log.warn("CL file={} does not exist any more, ignoring", file.getName()); + finish(TaskStatus.SUCCESS, -1); + return; + } + long seg = CommitLogUtil.extractTimestamp(file.getName()); + + int currentPosition = segmentOffsetWriter.position(Optional.empty(), seg); + if (syncPosition >= currentPosition) { + CommitLogPosition minPosition = new CommitLogPosition(seg, currentPosition); + CommitLogReadHandlerImpl commitLogReadHandler = new CommitLogReadHandlerImpl((MutationSender) mutationSender, this, currentPosition); + CommitLogReader commitLogReader = new CommitLogReader(); + org.apache.cassandra.io.util.File cassandraFile = new org.apache.cassandra.io.util.File(file.getAbsolutePath()); + commitLogReader.readCommitLogSegment(commitLogReadHandler, cassandraFile, minPosition, false); + lastSentPosition = commitLogReadHandler.getProcessedPosition(); + } + finish(TaskStatus.SUCCESS, lastSentPosition); + } catch (Exception e) { + log.warn("Task failed {}", this, e); + finish(TaskStatus.ERROR, -1); + } finally { + CdcMetrics.executedTasks.inc(); + } + } + + @Override + public File getFile() { + return new File(DatabaseDescriptor.getCDCLogLocation().path(), filename); + } + }; + } +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Mutation.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Mutation.java new file mode 100644 index 00000000..14ad1d50 --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Mutation.java @@ -0,0 +1,68 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class Mutation extends AbstractMutation { + + public Mutation(UUID nodeId, Long segment, int position, Object[] pkValues, long tsMicro, String md5Digest, TableMetadata t, Object token) { + super(nodeId, segment, position, pkValues, tsMicro, md5Digest, t, token); + } + + @Override + public String key() { + return metadata.keyspace + "." + metadata.name; + } + + @Override + public String name() { + return metadata.name; + } + + @Override + public String keyspace() { + return metadata.keyspace; + } + + @Override + public List primaryKeyColumns() { + List columnInfos = new ArrayList<>(); + for(ColumnMetadata cm : metadata.primaryKeyColumns()) + columnInfos.add(new ColumnInfo() { + @Override + public String name() { + return cm.name.toString(); + } + + @Override + public String cql3Type() { + return cm.type.asCQL3Type().toString(); + } + + @Override + public boolean isClusteringKey() { + return cm.isClusteringColumn(); + } + }); + return columnInfos; + } +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/MutationMaker.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/MutationMaker.java new file mode 100644 index 00000000..a58337f5 --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/MutationMaker.java @@ -0,0 +1,39 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import com.datastax.oss.cdc.agent.exceptions.CassandraConnectorTaskException; +import lombok.extern.slf4j.Slf4j; +import org.apache.cassandra.schema.TableMetadata; + +import java.util.UUID; + +@Slf4j +public class MutationMaker extends AbstractMutationMaker { + + public void createRecord(UUID nodeId, long segment, int position, + long tsMicro, Object[] pkValues, + BlockingConsumer consumer, + String md5Digest, TableMetadata t, Object token) { + Mutation record = new Mutation(nodeId, segment, position, pkValues, tsMicro, md5Digest, t, token); + try { + consumer.accept(record); + } catch (InterruptedException e) { + log.error("Interruption while enqueuing Change Event {}", record); + throw new CassandraConnectorTaskException("Enqueuing has been interrupted: ", e); + } + } +} diff --git a/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/PulsarMutationSender.java b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/PulsarMutationSender.java new file mode 100644 index 00000000..d363a0f3 --- /dev/null +++ b/agent-hcd/src/main/java/com/datastax/oss/cdc/agent/PulsarMutationSender.java @@ -0,0 +1,162 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import com.datastax.oss.cdc.CqlLogicalTypes; +import com.google.common.collect.ImmutableMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BooleanType; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.DecimalType; +import org.apache.cassandra.db.marshal.DoubleType; +import org.apache.cassandra.db.marshal.DurationType; +import org.apache.cassandra.db.marshal.FloatType; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.db.marshal.ShortType; +import org.apache.cassandra.db.marshal.SimpleDateType; +import org.apache.cassandra.db.marshal.TimeType; +import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.db.marshal.TimestampType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; + +import java.net.InetAddress; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.UUID; + +@Slf4j +public class PulsarMutationSender extends AbstractPulsarMutationSender { + + private static final ImmutableMap avroSchemaTypes = ImmutableMap.builder() + .put(UTF8Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) + .put(AsciiType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) + .put(BooleanType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) + .put(BytesType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) + .put(ByteType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) // INT8 not supported by AVRO + .put(ShortType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) // INT16 not supported by AVRO + .put(Int32Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) + .put(IntegerType.instance.asCQL3Type().toString(), CqlLogicalTypes.varintType) + .put(LongType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) + .put(FloatType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) + .put(DoubleType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) + .put(DecimalType.instance.asCQL3Type().toString(), CqlLogicalTypes.decimalType) + .put(InetAddressType.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) + .put(TimestampType.instance.asCQL3Type().toString(), CqlLogicalTypes.timestampMillisType) + .put(SimpleDateType.instance.asCQL3Type().toString(), CqlLogicalTypes.dateType) + .put(TimeType.instance.asCQL3Type().toString(), CqlLogicalTypes.timeMicrosType) + .put(DurationType.instance.asCQL3Type().toString(), CqlLogicalTypes.durationType) + .put(UUIDType.instance.asCQL3Type().toString(), CqlLogicalTypes.uuidType) + .put(TimeUUIDType.instance.asCQL3Type().toString(), CqlLogicalTypes.uuidType) + .build(); + + public PulsarMutationSender(AgentConfig config) { + super(config, DatabaseDescriptor.getPartitionerName().equals(Murmur3Partitioner.class.getName())); + } + + @Override + public void incSkippedMutations() { + CdcMetrics.skippedMutations.inc(); + } + + @Override + public UUID getHostId() { + return StorageService.instance.getLocalHostUUID(); + } + + @Override + public org.apache.avro.Schema getNativeSchema(String cql3Type) { + return avroSchemaTypes.get(cql3Type); + } + + @Override + public SchemaAndWriter getPkSchema(String key) { + return pkSchemas.get(key); + } + + /** + * Check the primary key has supported columns. + * @param mutation + * @return false if the primary key has unsupported CQL columns + */ + @Override + public boolean isSupported(final AbstractMutation mutation) { + if (!pkSchemas.containsKey(mutation.key())) { + for (ColumnMetadata cm : mutation.metadata.primaryKeyColumns()) { + if (!avroSchemaTypes.containsKey(cm.type.asCQL3Type().toString())) { + log.warn("Unsupported primary key column={}.{}.{} type={}, skipping mutation", cm.ksName, cm.cfName, cm.name, cm.type.asCQL3Type().toString()); + return false; + } + } + } + return true; + } + + @Override + public Object cqlToAvro(TableMetadata tableMetadata, String columnName, Object value) { + ColumnMetadata columnMetadata = tableMetadata.getColumn(ColumnIdentifier.getInterned(columnName, false)); + AbstractType type = columnMetadata.type.isReversed() ? ((ReversedType) columnMetadata.type).baseType : columnMetadata.type; + log.trace("column name={} type={} class={} value={}", + columnMetadata.name, type.getClass().getName(), + value != null ? value.getClass().getName() : null, value); + + if (value == null) + return null; + + if (type instanceof TimestampType) { + if (value instanceof Date) + return ((Date) value).getTime(); + if (value instanceof Instant) + return ((Instant) value).toEpochMilli(); + } + if (type instanceof SimpleDateType && value instanceof Integer) { + long timeInMillis = Duration.ofDays((Integer) value + Integer.MIN_VALUE).toMillis(); + Instant instant = Instant.ofEpochMilli(timeInMillis); + LocalDate localDate = LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate(); + return (int) localDate.toEpochDay(); // Avro date is an int that stores the number of days from the unix epoch + } + if (type instanceof TimeType && value instanceof Long) { + return ((Long) value / 1000); // Avro time is in microseconds + } + if (type instanceof InetAddressType) { + return ((InetAddress) value).getHostAddress(); + } + if (type instanceof ByteType) { + return Byte.toUnsignedInt((byte) value); // AVRO does not support INT8 + } + if (type instanceof ShortType) { + return Short.toUnsignedInt((short) value); // AVRO does not support INT16 + } + return value; + } +} diff --git a/gradle.properties b/gradle.properties index f9c8d811..e3991c6d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,6 +14,9 @@ ossDriverVersion=4.19.2 cassandra3Version=3.11.19 cassandra4Version=4.0.4 dse4Version=6.8.61 +hcdVersion=1.2.3 +hcdDbVersion=4.0.11.0-0c7d6f7c9412 +hcdHome=/tmp/hcd-1.2.3 pulsarGroup=org.apache.pulsar pulsarVersion=3.0.0 diff --git a/settings.gradle b/settings.gradle index e23f93e5..3ea568b5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,9 @@ include 'agent-c4' if (startParameter.projectProperties.containsKey("dse4")) { include 'agent-dse4' } +if (startParameter.projectProperties.containsKey("hcd")) { + include 'agent-hcd' +} include 'agent-distribution' include 'connector'