From 33f4fd9543a78d851d2150d118c5c07566ad69f5 Mon Sep 17 00:00:00 2001
From: GavinXiao <2749984520@qq.com>
Date: Wed, 17 Dec 2025 09:20:12 +0800
Subject: [PATCH] feat(TAP-8714): mysql support auto adjust stream read size
---
connectors-common/mysql-core/pom.xml | 8 +-
.../tapdata/connector/mysql/MysqlReader.java | 106 +++++++++++-------
.../mysql/accept/MysqlAbstractAcceptor.java | 27 +++++
.../mysql/accept/MysqlBatchAcceptor.java | 83 ++++++++++++++
.../mysql/accept/MysqlOneByOneAcceptor.java | 63 +++++++++++
connectors/mysql-connector/pom.xml | 2 +-
.../connector/mysql/MysqlConnector.java | 7 ++
7 files changed, 251 insertions(+), 45 deletions(-)
create mode 100644 connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlAbstractAcceptor.java
create mode 100644 connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlBatchAcceptor.java
create mode 100644 connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlOneByOneAcceptor.java
diff --git a/connectors-common/mysql-core/pom.xml b/connectors-common/mysql-core/pom.xml
index 82d6621ee..bbacf0451 100644
--- a/connectors-common/mysql-core/pom.xml
+++ b/connectors-common/mysql-core/pom.xml
@@ -33,7 +33,7 @@
8
8.0.33
1.5.4.Final
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.0-SNAPSHOT
@@ -146,6 +146,12 @@
5.0.4.RELEASE
test
+
+ io.tapdata
+ cdc-core
+ 1.0-SNAPSHOT
+ compile
+
diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
index 14e661a73..9920df29b 100644
--- a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
+++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
@@ -10,6 +10,9 @@
import io.tapdata.common.ddl.type.DDLParserType;
import io.tapdata.common.ddl.wrapper.DDLWrapperConfig;
import io.tapdata.common.exception.ExceptionCollector;
+import io.tapdata.connector.mysql.accept.MysqlAbstractAcceptor;
+import io.tapdata.connector.mysql.accept.MysqlBatchAcceptor;
+import io.tapdata.connector.mysql.accept.MysqlOneByOneAcceptor;
import io.tapdata.connector.mysql.config.MysqlConfig;
import io.tapdata.connector.mysql.constant.DeployModeEnum;
import io.tapdata.connector.mysql.entity.MysqlBinlogPosition;
@@ -44,9 +47,10 @@
import io.tapdata.kit.ErrorKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -88,7 +92,7 @@ public class MysqlReader implements Closeable {
private final Supplier isAlive;
protected final MysqlJdbcContextV2 mysqlJdbcContext;
private EmbeddedEngine embeddedEngine;
- protected StreamReadConsumer streamReadConsumer;
+ protected MysqlAbstractAcceptor, ?> streamReadConsumer;
private LinkedBlockingQueue eventQueue;
private ScheduledExecutorService mysqlSchemaHistoryMonitor;
protected KVReadOnlyMap tapTableMap;
@@ -368,8 +372,21 @@ public void taskStarted() {
}
}
+ protected MysqlAbstractAcceptor, ?> createAcceptor(int batchSize, TapStreamReadConsumer, Object> consumer) {
+ if (consumer instanceof StreamReadOneByOneConsumer) {
+ return new MysqlOneByOneAcceptor()
+ .setConsumer((StreamReadOneByOneConsumer) consumer);
+ } else if (consumer instanceof StreamReadConsumer) {
+ return new MysqlBatchAcceptor()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(batchSize);
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
+ }
+
public void readBinlog(TapConnectorContext tapConnectorContext, List tables,
- Object offset, int batchSize, DDLParserType ddlParserType, StreamReadConsumer consumer, HashMap contextMapForMasterSlave) throws Throwable {
+ Object offset, int batchSize, DDLParserType ddlParserType, TapStreamReadConsumer, Object> consumer, HashMap contextMapForMasterSlave) throws Throwable {
MysqlUtil.buildMasterNode(mysqlConfig, contextMapForMasterSlave);
try {
initDebeziumServerName(tapConnectorContext);
@@ -414,7 +431,7 @@ public void readBinlog(TapConnectorContext tapConnectorContext, List tab
offsetStr = jsonParser.toJson(mysqlStreamOffset);
}
tapLogger.info("Starting mysql cdc, server name: " + serverName);
- this.streamReadConsumer = consumer;
+ this.streamReadConsumer = createAcceptor(batchSize, consumer);
LockManager.mysqlSchemaHistoryTransferManager.computeIfAbsent(serverName, key -> {
this.schemaHistoryTransfer = new MysqlSchemaHistoryTransfer();
return this.schemaHistoryTransfer;
@@ -512,8 +529,14 @@ public void taskStarted() {
streamReadConsumer.streamReadStarted();
}
})
- .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
- numberOfMessagesSinceLastCommit >= batchSize || timeSinceLastCommit.getSeconds() >= 5)
+ .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> {
+ int size = Math.min(Math.max(1, streamReadConsumer.getBatchSize()), 10000);
+ //超时时间最小1秒,最大5秒
+ int timeout = Math.min(Math.max(1, size / 100), 5);
+ return numberOfMessagesSinceLastCommit >= size || timeSinceLastCommit.getSeconds() >= timeout;
+ })
+// .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
+// numberOfMessagesSinceLastCommit >= batchSize || timeSinceLastCommit.getSeconds() >= 5)
.using((result, message, throwable) -> {
tapConnectorContext.configContext();
if (result) {
@@ -654,32 +677,25 @@ private void consumeRecords(List sourceRecords, DebeziumEngine.Rec
if (null != throwableAtomicReference.get()) {
throw new RuntimeException(throwableAtomicReference.get());
}
- List mysqlStreamEvents = new ArrayList<>();
- for (SourceRecord record : sourceRecords) {
+ for (int i = 0; i < sourceRecords.size(); i++) {
+ SourceRecord record = sourceRecords.get(i);
+ boolean lastOne = (i == sourceRecords.size() - 1);
if (null == record || null == record.value()) continue;
Schema valueSchema = record.valueSchema();
if (null != valueSchema.field("op")) {
- MysqlStreamEvent mysqlStreamEvent = wrapDML(record);
- Optional.ofNullable(mysqlStreamEvent).ifPresent(mysqlStreamEvents::add);
+ MysqlStreamEvent mysqlStreamEvent = wrapDML(record, lastOne);
+ Optional.ofNullable(mysqlStreamEvent).ifPresent(streamReadConsumer::accept);
} else if (null != valueSchema.field("ddl")) {
- mysqlStreamEvents.addAll(Objects.requireNonNull(wrapDDL(record)));
+ wrapDDLWithConsumer(record, lastOne, streamReadConsumer::accept);
} else if ("io.debezium.connector.common.Heartbeat".equals(valueSchema.name())) {
Optional.ofNullable((Struct) record.value())
.map(value -> value.getInt64("ts_ms"))
.map(TapSimplify::heartbeatEvent)
- .map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, getMysqlStreamOffset(record)))
- .ifPresent(mysqlStreamEvents::add);
+ .map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, lastOne ? getMysqlStreamOffset(record) : null))
+ .ifPresent(streamReadConsumer::accept);
}
}
- if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
- List tapEvents = new ArrayList<>();
- MysqlStreamOffset mysqlStreamOffset = null;
- for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
- tapEvents.add(mysqlStreamEvent.getTapEvent());
- mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
- }
- streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
- }
+ streamReadConsumer.complete();
}
protected void sourceRecordConsumer(SourceRecord record) {
@@ -688,31 +704,26 @@ protected void sourceRecordConsumer(SourceRecord record) {
}
if (null == record || null == record.value()) return;
Schema valueSchema = record.valueSchema();
- List mysqlStreamEvents = new ArrayList<>();
if (null != valueSchema.field("op")) {
MysqlStreamEvent mysqlStreamEvent = wrapDML(record);
- Optional.ofNullable(mysqlStreamEvent).ifPresent(mysqlStreamEvents::add);
+ Optional.ofNullable(mysqlStreamEvent).ifPresent(streamReadConsumer::accept);
} else if (null != valueSchema.field("ddl")) {
- mysqlStreamEvents = wrapDDL(record);
+ wrapDDLWithConsumer(record, false, streamReadConsumer::accept);
} else if ("io.debezium.connector.common.Heartbeat".equals(valueSchema.name())) {
Optional.ofNullable((Struct) record.value())
.map(value -> value.getInt64("ts_ms"))
.map(TapSimplify::heartbeatEvent)
.map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, getMysqlStreamOffset(record)))
- .ifPresent(mysqlStreamEvents::add);
- }
- if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
- List tapEvents = new ArrayList<>();
- MysqlStreamOffset mysqlStreamOffset = null;
- for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
- tapEvents.add(mysqlStreamEvent.getTapEvent());
- mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
- }
- streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
+ .ifPresent(streamReadConsumer::accept);
}
+ streamReadConsumer.complete();
}
protected MysqlStreamEvent wrapDML(SourceRecord record) {
+ return wrapDML(record, false);
+ }
+
+ protected MysqlStreamEvent wrapDML(SourceRecord record, boolean lastOne) {
TapRecordEvent tapRecordEvent = null;
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
@@ -830,7 +841,10 @@ protected MysqlStreamEvent wrapDML(SourceRecord record) {
tapRecordEvent.setTableId(table);
tapRecordEvent.setReferenceTime(eventTime);
tapRecordEvent.setExactlyOnceId(getExactlyOnceId(record));
- return wrapOffsetEvent(tapRecordEvent, record);
+ if (lastOne) {
+ return wrapOffsetEvent(tapRecordEvent, record);
+ }
+ return new MysqlStreamEvent(tapRecordEvent, null);
}
protected MysqlStreamEvent wrapOffsetEvent(TapEvent tapEvent, SourceRecord sourceRecord) {
@@ -838,17 +852,16 @@ protected MysqlStreamEvent wrapOffsetEvent(TapEvent tapEvent, SourceRecord sourc
return new MysqlStreamEvent(tapEvent, mysqlStreamOffset);
}
- protected List wrapDDL(SourceRecord record) {
- List mysqlStreamEvents = new ArrayList<>();
+ protected void wrapDDLWithConsumer(SourceRecord record, boolean lastOne, Consumer consumer) {
Object value = record.value();
if (!(value instanceof Struct)) {
- return null;
+ return;
}
Struct structValue = (Struct) value;
Struct source = structValue.getStruct("source");
Long eventTime = source.getInt64("ts_ms");
String ddlStr = structValue.getString(SOURCE_RECORD_DDL_KEY);
- MysqlStreamOffset mysqlStreamOffset = getMysqlStreamOffset(record);
+ MysqlStreamOffset mysqlStreamOffset = lastOne ? getMysqlStreamOffset(record) : null;
if (StringUtils.isNotBlank(ddlStr)) {
try {
DDLFactory.ddlToTapDDLEvent(
@@ -862,7 +875,7 @@ protected List wrapDDL(SourceRecord record) {
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddlStr);
tapDDLEvent.setExactlyOnceId(getExactlyOnceId(record));
- mysqlStreamEvents.add(mysqlStreamEvent);
+ consumer.accept(mysqlStreamEvent);
tapLogger.info("Read DDL: " + ddlStr + ", about to be packaged as some event(s)");
}
);
@@ -873,10 +886,17 @@ protected List wrapDDL(SourceRecord record) {
tapDDLEvent.setReferenceTime(eventTime);
tapDDLEvent.setOriginDDL(ddlStr);
tapDDLEvent.setExactlyOnceId(getExactlyOnceId(record));
- mysqlStreamEvents.add(mysqlStreamEvent);
-// throw new RuntimeException("Handle ddl failed: " + ddlStr + ", error: " + e.getMessage(), e);
+ consumer.accept(mysqlStreamEvent);
}
}
+ }
+
+ /**
+ * @deprecated
+ * */
+ protected List wrapDDL(SourceRecord record) {
+ List mysqlStreamEvents = new ArrayList<>();
+ wrapDDLWithConsumer(record, false, mysqlStreamEvents::add);
return mysqlStreamEvents;
}
diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlAbstractAcceptor.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlAbstractAcceptor.java
new file mode 100644
index 000000000..f79d7e594
--- /dev/null
+++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlAbstractAcceptor.java
@@ -0,0 +1,27 @@
+package io.tapdata.connector.mysql.accept;
+
+import io.tapdata.cdc.Acceptor;
+import io.tapdata.connector.mysql.entity.MysqlStreamEvent;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/16 09:43 Create
+ * @description
+ */
+public abstract class MysqlAbstractAcceptor> implements Acceptor {
+ protected Object offset;
+
+ public void updateOffset(Object offset) {
+ this.offset = offset;
+ }
+
+ public Object getOffset() {
+ return offset;
+ }
+
+ public void complete() {
+ //do nothing
+ }
+}
diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlBatchAcceptor.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlBatchAcceptor.java
new file mode 100644
index 000000000..f78d95180
--- /dev/null
+++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlBatchAcceptor.java
@@ -0,0 +1,83 @@
+package io.tapdata.connector.mysql.accept;
+
+import io.tapdata.connector.mysql.entity.MysqlStreamEvent;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/16 09:21 Create
+ * @description
+ */
+public class MysqlBatchAcceptor extends MysqlAbstractAcceptor {
+ StreamReadConsumer consumer;
+ int batchSize;
+ long batchSizeTimeout;
+ List events = new ArrayList<>();
+
+ @Override
+ public void accept(MysqlStreamEvent e) {
+ if (null == e) {
+ return;
+ }
+ Optional.ofNullable(e.getMysqlStreamOffset()).ifPresent(this::updateOffset);
+ events.add(e.getTapEvent());
+ }
+
+ @Override
+ public void complete() {
+ if (events.isEmpty()) {
+ return;
+ }
+ consumer.accept(events, getOffset());
+ events = new ArrayList<>();
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ consumer.accept(e, offset);
+ }
+
+ @Override
+ public MysqlBatchAcceptor setConsumer(StreamReadConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public MysqlBatchAcceptor setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public MysqlBatchAcceptor setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return this.batchSize;
+ }
+}
diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlOneByOneAcceptor.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlOneByOneAcceptor.java
new file mode 100644
index 000000000..8e653b33c
--- /dev/null
+++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/accept/MysqlOneByOneAcceptor.java
@@ -0,0 +1,63 @@
+package io.tapdata.connector.mysql.accept;
+
+import io.tapdata.connector.mysql.entity.MysqlStreamEvent;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+
+import java.util.List;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/16 09:23 Create
+ * @description
+ */
+public class MysqlOneByOneAcceptor extends MysqlAbstractAcceptor {
+ StreamReadOneByOneConsumer consumer;
+
+ @Override
+ public void accept(MysqlStreamEvent e) {
+ consumer.accept(e.getTapEvent(), e.getMysqlStreamOffset());
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ consumer.accept(e, offset);
+ }
+
+ @Override
+ public MysqlOneByOneAcceptor setConsumer(StreamReadOneByOneConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public MysqlOneByOneAcceptor setBatchSize(int size) {
+ return this;
+ }
+
+ @Override
+ public MysqlOneByOneAcceptor setBatchSizeTimeout(long ms) {
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return this.consumer.getBatchSize();
+ }
+
+ @Override
+ public StreamReadOneByOneConsumer getConsumer() {
+ return this.consumer;
+ }
+}
diff --git a/connectors/mysql-connector/pom.xml b/connectors/mysql-connector/pom.xml
index 2076f4998..4b36c0eb2 100644
--- a/connectors/mysql-connector/pom.xml
+++ b/connectors/mysql-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.2-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java b/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java
index 98221139a..c7f608a74 100644
--- a/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java
+++ b/connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java
@@ -42,6 +42,7 @@
import io.tapdata.partition.DatabaseReadPartitionSplitter;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.*;
@@ -272,6 +273,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportBatchCount(this::batchCount);
connectorFunctions.supportBatchRead(this::batchReadWithoutOffset);
connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamReadOneByOne);
connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset);
connectorFunctions.supportQueryByAdvanceFilter(this::queryByAdvanceFilterWithOffset);
connectorFunctions.supportCountByPartitionFilterFunction(this::countByAdvanceFilter);
@@ -840,6 +842,11 @@ private void streamRead(TapConnectorContext tapConnectorContext, List ta
mysqlReader.readBinlog(tapConnectorContext, tables, offset, batchSize, DDLParserType.MYSQL_CCJ_SQL_PARSER, consumer, contextMapForMasterSlave);
}
+ private void streamReadOneByOne(TapConnectorContext context, List tables, Object offset, StreamReadOneByOneConsumer consumer) throws Throwable {
+ throwNonSupportWhenLightInit();
+ mysqlReader.readBinlog(context, tables, offset, consumer.getBatchSize(), DDLParserType.MYSQL_CCJ_SQL_PARSER, consumer, contextMapForMasterSlave);
+ }
+
@Override
public ConnectionOptions connectionTest(TapConnectionContext connectionContext, Consumer consumer) {