Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion connectors-common/mysql-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<java.version>8</java.version>
<mysql.driver.version>8.0.33</mysql.driver.version>
<debezium.version>1.5.4.Final</debezium.version>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.5-SNAPSHOT</tapdata.pdk.api.version>
<pdk-error-code.version>1.0-SNAPSHOT</pdk-error-code.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -146,6 +146,12 @@
<version>5.0.4.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.tapdata</groupId>
<artifactId>cdc-core</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.tapdata.common.ddl.type.DDLParserType;
import io.tapdata.common.ddl.wrapper.DDLWrapperConfig;
import io.tapdata.common.exception.ExceptionCollector;
import io.tapdata.connector.mysql.accept.MysqlAbstractAcceptor;
import io.tapdata.connector.mysql.accept.MysqlBatchAcceptor;
import io.tapdata.connector.mysql.accept.MysqlOneByOneAcceptor;
import io.tapdata.connector.mysql.config.MysqlConfig;
import io.tapdata.connector.mysql.constant.DeployModeEnum;
import io.tapdata.connector.mysql.entity.MysqlBinlogPosition;
Expand Down Expand Up @@ -44,9 +47,10 @@
import io.tapdata.kit.ErrorKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -88,7 +92,7 @@ public class MysqlReader implements Closeable {
private final Supplier<Boolean> isAlive;
protected final MysqlJdbcContextV2 mysqlJdbcContext;
private EmbeddedEngine embeddedEngine;
protected StreamReadConsumer streamReadConsumer;
protected MysqlAbstractAcceptor<?, ?> streamReadConsumer;
private LinkedBlockingQueue<MysqlStreamEvent> eventQueue;
private ScheduledExecutorService mysqlSchemaHistoryMonitor;
protected KVReadOnlyMap<TapTable> tapTableMap;
Expand Down Expand Up @@ -368,8 +372,21 @@ public void taskStarted() {
}
}

protected MysqlAbstractAcceptor<?, ?> createAcceptor(int batchSize, TapStreamReadConsumer<?, Object> consumer) {
if (consumer instanceof StreamReadOneByOneConsumer) {
return new MysqlOneByOneAcceptor()
.setConsumer((StreamReadOneByOneConsumer) consumer);
} else if (consumer instanceof StreamReadConsumer) {
return new MysqlBatchAcceptor()
.setConsumer((StreamReadConsumer) consumer)
.setBatchSize(batchSize);
} else {
throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
}
}

public void readBinlog(TapConnectorContext tapConnectorContext, List<String> tables,
Object offset, int batchSize, DDLParserType ddlParserType, StreamReadConsumer consumer, HashMap<String, MysqlJdbcContextV2> contextMapForMasterSlave) throws Throwable {
Object offset, int batchSize, DDLParserType ddlParserType, TapStreamReadConsumer<?, Object> consumer, HashMap<String, MysqlJdbcContextV2> contextMapForMasterSlave) throws Throwable {
MysqlUtil.buildMasterNode(mysqlConfig, contextMapForMasterSlave);
try {
initDebeziumServerName(tapConnectorContext);
Expand Down Expand Up @@ -414,7 +431,7 @@ public void readBinlog(TapConnectorContext tapConnectorContext, List<String> tab
offsetStr = jsonParser.toJson(mysqlStreamOffset);
}
tapLogger.info("Starting mysql cdc, server name: " + serverName);
this.streamReadConsumer = consumer;
this.streamReadConsumer = createAcceptor(batchSize, consumer);
LockManager.mysqlSchemaHistoryTransferManager.computeIfAbsent(serverName, key -> {
this.schemaHistoryTransfer = new MysqlSchemaHistoryTransfer();
return this.schemaHistoryTransfer;
Expand Down Expand Up @@ -512,8 +529,14 @@ public void taskStarted() {
streamReadConsumer.streamReadStarted();
}
})
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
numberOfMessagesSinceLastCommit >= batchSize || timeSinceLastCommit.getSeconds() >= 5)
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> {
int size = Math.min(Math.max(1, streamReadConsumer.getBatchSize()), 10000);
//超时时间最小1秒,最大5秒
int timeout = Math.min(Math.max(1, size / 100), 5);
return numberOfMessagesSinceLastCommit >= size || timeSinceLastCommit.getSeconds() >= timeout;
})
// .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
// numberOfMessagesSinceLastCommit >= batchSize || timeSinceLastCommit.getSeconds() >= 5)
.using((result, message, throwable) -> {
tapConnectorContext.configContext();
if (result) {
Expand Down Expand Up @@ -654,32 +677,25 @@ private void consumeRecords(List<SourceRecord> sourceRecords, DebeziumEngine.Rec
if (null != throwableAtomicReference.get()) {
throw new RuntimeException(throwableAtomicReference.get());
}
List<MysqlStreamEvent> mysqlStreamEvents = new ArrayList<>();
for (SourceRecord record : sourceRecords) {
for (int i = 0; i < sourceRecords.size(); i++) {
SourceRecord record = sourceRecords.get(i);
boolean lastOne = (i == sourceRecords.size() - 1);
if (null == record || null == record.value()) continue;
Schema valueSchema = record.valueSchema();
if (null != valueSchema.field("op")) {
MysqlStreamEvent mysqlStreamEvent = wrapDML(record);
Optional.ofNullable(mysqlStreamEvent).ifPresent(mysqlStreamEvents::add);
MysqlStreamEvent mysqlStreamEvent = wrapDML(record, lastOne);
Optional.ofNullable(mysqlStreamEvent).ifPresent(streamReadConsumer::accept);
} else if (null != valueSchema.field("ddl")) {
mysqlStreamEvents.addAll(Objects.requireNonNull(wrapDDL(record)));
wrapDDLWithConsumer(record, lastOne, streamReadConsumer::accept);
} else if ("io.debezium.connector.common.Heartbeat".equals(valueSchema.name())) {
Optional.ofNullable((Struct) record.value())
.map(value -> value.getInt64("ts_ms"))
.map(TapSimplify::heartbeatEvent)
.map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, getMysqlStreamOffset(record)))
.ifPresent(mysqlStreamEvents::add);
.map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, lastOne ? getMysqlStreamOffset(record) : null))
.ifPresent(streamReadConsumer::accept);
}
}
if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
List<TapEvent> tapEvents = new ArrayList<>();
MysqlStreamOffset mysqlStreamOffset = null;
for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
tapEvents.add(mysqlStreamEvent.getTapEvent());
mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
}
streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
}
streamReadConsumer.complete();
}

protected void sourceRecordConsumer(SourceRecord record) {
Expand All @@ -688,31 +704,26 @@ protected void sourceRecordConsumer(SourceRecord record) {
}
if (null == record || null == record.value()) return;
Schema valueSchema = record.valueSchema();
List<MysqlStreamEvent> mysqlStreamEvents = new ArrayList<>();
if (null != valueSchema.field("op")) {
MysqlStreamEvent mysqlStreamEvent = wrapDML(record);
Optional.ofNullable(mysqlStreamEvent).ifPresent(mysqlStreamEvents::add);
Optional.ofNullable(mysqlStreamEvent).ifPresent(streamReadConsumer::accept);
} else if (null != valueSchema.field("ddl")) {
mysqlStreamEvents = wrapDDL(record);
wrapDDLWithConsumer(record, false, streamReadConsumer::accept);
} else if ("io.debezium.connector.common.Heartbeat".equals(valueSchema.name())) {
Optional.ofNullable((Struct) record.value())
.map(value -> value.getInt64("ts_ms"))
.map(TapSimplify::heartbeatEvent)
.map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, getMysqlStreamOffset(record)))
.ifPresent(mysqlStreamEvents::add);
}
if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
List<TapEvent> tapEvents = new ArrayList<>();
MysqlStreamOffset mysqlStreamOffset = null;
for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
tapEvents.add(mysqlStreamEvent.getTapEvent());
mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
}
streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
.ifPresent(streamReadConsumer::accept);
}
streamReadConsumer.complete();
}

protected MysqlStreamEvent wrapDML(SourceRecord record) {
return wrapDML(record, false);
}

protected MysqlStreamEvent wrapDML(SourceRecord record, boolean lastOne) {
TapRecordEvent tapRecordEvent = null;
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
Expand Down Expand Up @@ -830,25 +841,27 @@ protected MysqlStreamEvent wrapDML(SourceRecord record) {
tapRecordEvent.setTableId(table);
tapRecordEvent.setReferenceTime(eventTime);
tapRecordEvent.setExactlyOnceId(getExactlyOnceId(record));
return wrapOffsetEvent(tapRecordEvent, record);
if (lastOne) {
return wrapOffsetEvent(tapRecordEvent, record);
}
return new MysqlStreamEvent(tapRecordEvent, null);
}

protected MysqlStreamEvent wrapOffsetEvent(TapEvent tapEvent, SourceRecord sourceRecord) {
MysqlStreamOffset mysqlStreamOffset = getMysqlStreamOffset(sourceRecord);
return new MysqlStreamEvent(tapEvent, mysqlStreamOffset);
}

protected List<MysqlStreamEvent> wrapDDL(SourceRecord record) {
List<MysqlStreamEvent> mysqlStreamEvents = new ArrayList<>();
protected void wrapDDLWithConsumer(SourceRecord record, boolean lastOne, Consumer<MysqlStreamEvent> consumer) {
Object value = record.value();
if (!(value instanceof Struct)) {
return null;
return;
}
Struct structValue = (Struct) value;
Struct source = structValue.getStruct("source");
Long eventTime = source.getInt64("ts_ms");
String ddlStr = structValue.getString(SOURCE_RECORD_DDL_KEY);
MysqlStreamOffset mysqlStreamOffset = getMysqlStreamOffset(record);
MysqlStreamOffset mysqlStreamOffset = lastOne ? getMysqlStreamOffset(record) : null;
if (StringUtils.isNotBlank(ddlStr)) {
try {
DDLFactory.ddlToTapDDLEvent(
Expand All @@ -862,7 +875,7 @@ protected List<MysqlStreamEvent> wrapDDL(SourceRecord record) {
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddlStr);
tapDDLEvent.setExactlyOnceId(getExactlyOnceId(record));
mysqlStreamEvents.add(mysqlStreamEvent);
consumer.accept(mysqlStreamEvent);
tapLogger.info("Read DDL: " + ddlStr + ", about to be packaged as some event(s)");
}
);
Expand All @@ -873,10 +886,17 @@ protected List<MysqlStreamEvent> wrapDDL(SourceRecord record) {
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddlStr);
tapDDLEvent.setExactlyOnceId(getExactlyOnceId(record));
mysqlStreamEvents.add(mysqlStreamEvent);
// throw new RuntimeException("Handle ddl failed: " + ddlStr + ", error: " + e.getMessage(), e);
consumer.accept(mysqlStreamEvent);
}
}
}

/**
* @deprecated
* */
protected List<MysqlStreamEvent> wrapDDL(SourceRecord record) {
List<MysqlStreamEvent> mysqlStreamEvents = new ArrayList<>();
wrapDDLWithConsumer(record, false, mysqlStreamEvents::add);
return mysqlStreamEvents;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.tapdata.connector.mysql.accept;

import io.tapdata.cdc.Acceptor;
import io.tapdata.connector.mysql.entity.MysqlStreamEvent;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;

/**
* @author <a href="2749984520@qq.com">Gavin'Xiao</a>
* @author <a href="https://github.com/11000100111010101100111">Gavin'Xiao</a>
* @version v1.0 2025/12/16 09:43 Create
* @description
*/
public abstract class MysqlAbstractAcceptor<M, C extends TapStreamReadConsumer<?, Object>> implements Acceptor<M, MysqlStreamEvent, C> {
protected Object offset;

public void updateOffset(Object offset) {
this.offset = offset;
}

public Object getOffset() {
return offset;
}

public void complete() {
//do nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.tapdata.connector.mysql.accept;

import io.tapdata.connector.mysql.entity.MysqlStreamEvent;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* @author <a href="2749984520@qq.com">Gavin'Xiao</a>
* @author <a href="https://github.com/11000100111010101100111">Gavin'Xiao</a>
* @version v1.0 2025/12/16 09:21 Create
* @description
*/
public class MysqlBatchAcceptor extends MysqlAbstractAcceptor<MysqlBatchAcceptor, StreamReadConsumer> {
StreamReadConsumer consumer;
int batchSize;
long batchSizeTimeout;
List<TapEvent> events = new ArrayList<>();

@Override
public void accept(MysqlStreamEvent e) {
if (null == e) {
return;
}
Optional.ofNullable(e.getMysqlStreamOffset()).ifPresent(this::updateOffset);
events.add(e.getTapEvent());
}

@Override
public void complete() {
if (events.isEmpty()) {
return;
}
consumer.accept(events, getOffset());
events = new ArrayList<>();
}

@Override
public void accept(List<TapEvent> e, Object offset) {
consumer.accept(e, offset);
}

@Override
public MysqlBatchAcceptor setConsumer(StreamReadConsumer consumer) {
this.consumer = consumer;
return this;
}

@Override
public MysqlBatchAcceptor setBatchSize(int size) {
this.batchSize = size;
return this;
}

@Override
public MysqlBatchAcceptor setBatchSizeTimeout(long ms) {
this.batchSizeTimeout = ms;
return this;
}

@Override
public void streamReadStarted() {
this.consumer.streamReadStarted();
}

@Override
public void streamReadEnded() {
this.consumer.streamReadEnded();
}

@Override
public StreamReadConsumer getConsumer() {
return this.consumer;
}

@Override
public int getBatchSize() {
return this.batchSize;
}
}
Loading