Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ Supported Cassandra version:
* Cassandra 3.11+
* Cassandra 4.0+
* [DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise) 6.8.16+
* [DataStax Hyper-Converged Database (HCD)](https://docs.datastax.com/en/hyper-converged-database/1.2/get-started/hcd-introduction.html) 1.2.3+

Note: Only Cassandra 4.0 and DSE 6.8.16+ support the near realtime CDC allowing to replicate data as soon as they are synced on disk.
Note: Only Cassandra 4.0, DSE 6.8.16+, and HCD 1.2.3+ support the near realtime CDC allowing to replicate data as soon as they are synced on disk.

## Documentation

Expand Down Expand Up @@ -83,10 +84,15 @@ Cassandra supported CQL3 data types (with the associated AVRO type or logical-ty

./gradlew assemble

Note: Artifacts for DSE agent are excluded by default. To build the `agent-dse4` module, specify the `dse4` property:
Note: Artifacts for HCD and DSE agents are excluded by default. To build these modules specify the appropriate properties:
the `agent-dse4` module, specify the `dse4` property:

# agent-dse4
./gradlew assemble -Pdse4

# agent-hcd
./gradlew assemble -Phcd

## Acknowledgments

Apache Cassandra, Apache Pulsar, Cassandra and Pulsar are trademarks of the Apache Software Foundation.
Expand Down
3 changes: 3 additions & 0 deletions agent-distribution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ dependencies {
if (project.hasProperty("dse4")) {
release project(path: ':agent-dse4', configuration: 'shadow')
}
if (project.hasProperty("hcd")) {
release project(path: ':agent-hcd', configuration: 'shadow')
}
}

distributions {
Expand Down
102 changes: 102 additions & 0 deletions agent-hcd/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
plugins {
id 'java-library'
id 'java'
id 'application'
id "com.github.johnrengelman.shadow"
}

application {
mainClass = "$mainClassName"
}

jar {
duplicatesStrategy = DuplicatesStrategy.INCLUDE
manifest {
attributes 'Premain-Class': "$mainClassName"
attributes 'Implementation-Version': project.version
}
zip64=true
}

tasks.jar.dependsOn project(':commons').jar
tasks.jar.dependsOn project(':agent').jar

compileJava {
// Remove -Werror to avoid issues with guava annotation warnings
options.compilerArgs = [ '-parameters', '-Xlint:all', '-Xlint:-processing', '-Xlint:-serial']
}

compileTestJava {
options.compilerArgs += '-parameters'
}
sourceSets {
// Make the compileOnly dependencies available when compiling/running tests
test.compileClasspath += configurations.compileClasspath
test.runtimeClasspath += configurations.compileClasspath
}


configurations {
custom
}

dependencies {
custom project(':commons')
custom project(':agent')

implementation project(':commons')
implementation project(':agent')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("${pulsarGroup}:pulsar-client:${pulsarVersion}")

// Use local HCD dse-db JAR file
compileOnly files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar")
compileOnly files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar")
compileOnly files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar")

testCompileOnly files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar")
testCompileOnly files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar")
testCompileOnly files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar")

testImplementation files("${hcdHome}/resources/cassandra/lib/dse-db-${hcdDbVersion}.jar")
testImplementation files("${hcdHome}/resources/cassandra/lib/metrics-core-3.1.5.jar")
testImplementation files("${hcdHome}/resources/cassandra/lib/guava-33.4.0-jre.jar")

testImplementation "org.apache.cassandra:java-driver-core:${ossDriverVersion}"
testImplementation "org.apache.cassandra:java-driver-query-builder:${ossDriverVersion}"

testImplementation "org.testcontainers:testcontainers:${testContainersVersion}"
testImplementation project(':testcontainers')

testRuntimeOnly "org.slf4j:slf4j-api:${slf4jVersion}"
testRuntimeOnly "ch.qos.logback:logback-classic:${logbackVersion}"
}

test {
// Add dependency on jar task, since it will be main target for testing
dependsOn shadowJar

// Rearrange test classpath, add compiled JAR instead of main classes
classpath = project.sourceSets.test.output + configurations.testRuntimeClasspath + files(shadowJar.archiveFile)

useJUnitPlatform()

environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
environment 'CASSANDRA_IMAGE', 'datastax/hcd:' + hcdVersion

systemProperty "buildDir", buildDir
systemProperty "projectVersion", project.version
}

shadowJar {
manifest {
inheritFrom project.tasks.jar.manifest
}
configurations = [project.configurations.custom]
// relocate AVRO because dse-db depends on avro 1.7.7
relocate 'org.apache.avro', 'com.datastax.oss.cdc.avro'
}

jar.enabled = true
assemble.dependsOn(shadowJar)
2 changes: 2 additions & 0 deletions agent-hcd/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
artifact=agent-hcd
mainClassName=com.datastax.oss.cdc.agent.Agent
95 changes: 95 additions & 0 deletions agent-hcd/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import lombok.extern.slf4j.Slf4j;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;

import java.lang.instrument.Instrumentation;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class Agent {
public static void premain(String agentArgs, Instrumentation inst) {
log.info("[Agent] In premain method");
try {
main(agentArgs, inst);
} catch(Exception e) {
log.error("error:", e);
System.exit(-1);
}
}

public static void agentmain(String agentArgs, Instrumentation inst) {
log.info("[Agent] In agentmain method");
try {
main(agentArgs, inst);
} catch(Exception e) {
log.error("error:", e);
System.exit(-1);
}
}

static void main(String agentArgs, Instrumentation inst) throws Exception {
DatabaseDescriptor.daemonInitialization();
if (DatabaseDescriptor.isCDCEnabled() == false) {
log.error("cdc_enabled=false in your cassandra configuration, CDC agent not started.");
} else if (DatabaseDescriptor.getCDCLogLocation() == null) {
log.error("cdc_raw_directory=null in your cassandra configuration, CDC agent not started.");
} else {
startCdcAgent(agentArgs);
}
}

static void startCdcAgent(String agentArgs) throws Exception {
String agentVersion = Agent.class.getPackage().getImplementationVersion();
log.info("Starting CDC agent v{}, cdc_raw_directory={}", agentVersion, DatabaseDescriptor.getCDCLogLocation());
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);

SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir);
segmentOffsetFileWriter.loadOffsets();

PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config);
CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config);
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer);
CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation().path(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true);

commitLogReaderService.initialize();

// detect commitlogs file and submit new/modified files to the commitLogReader
ExecutorService commitLogExecutor = Executors.newSingleThreadExecutor();
commitLogExecutor.submit(() -> {
try {
do {
// wait to initialize the hostID before starting
Thread.sleep(1000);
} while(StorageService.instance.getLocalHostUUID() == null);

commitLogProcessor.initialize();
commitLogProcessor.start();
} catch(Exception e) {
log.error("commitLogProcessor error:", e);
}
});

ExecutorService commitLogServiceExecutor = Executors.newSingleThreadExecutor();
commitLogServiceExecutor.submit(commitLogReaderService);

log.info("CDC agent started");
}
}
54 changes: 54 additions & 0 deletions agent-hcd/src/main/java/com/datastax/oss/cdc/agent/CdcMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.MetricNameFactory;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

public class CdcMetrics {
public static final String CDC_AGENT_MBEAN_NAME = "CdcAgent";
private static final MetricNameFactory factory = new DefaultNameFactory(CDC_AGENT_MBEAN_NAME);

public static final Counter sentMutations = Metrics.counter(factory.createMetricName("SentMutations"));
public static final Counter sentErrors = Metrics.counter(factory.createMetricName("SentErrors"));

public static final Counter commitLogReadErrors = Metrics.counter(factory.createMetricName("CommitLogReadErrors"));
public static final Counter skippedMutations = Metrics.counter(factory.createMetricName("SkippedMutations"));

public static final Counter executedTasks = Metrics.counter(factory.createMetricName("ExecutedTasks"));

public static final Gauge<Integer> submittedTasksGauge = Metrics.register(factory.createMetricName("SubmittedTasks"),
CommitLogReaderService.submittedTasks::size);

public static final Gauge<Integer> maxSubmittedTasks = Metrics.register(factory.createMetricName("MaxSubmittedTasks"),
CommitLogReaderService.maxSubmittedTasks::get);

public static final Gauge<Integer> pendingTasksGauge = Metrics.register(factory.createMetricName("PendingTasks"),
CommitLogReaderService.pendingTasks::size);

public static final Gauge<Integer> maxPendingTasks = Metrics.register(factory.createMetricName("MaxPendingTasks"),
CommitLogReaderService.maxPendingTasks::get);

public static final Gauge<Integer> uncleanedTasksGauge = Metrics.register(factory.createMetricName("UncleanedTasks"),
CommitLogReaderService.pendingTasks::size);

public static final Gauge<Integer> maxUncleanedTasks = Metrics.register(factory.createMetricName("MaxUncleanedTasks"),
CommitLogReaderService.maxUncleanedTasks::get);
}
Loading
Loading