Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
9c5eb0c
feat(TAP-8714): mongo db support auto adjust stream read size
11000100111010101100111 Dec 9, 2025
3e3e1f4
Merge pull request #665 from tapdata/TAP-8714-auto-set-batch-size-mon…
11000100111010101100111 Dec 9, 2025
d41177c
feat(TAP-8714): pg db support auto adjust stream read size
11000100111010101100111 Dec 10, 2025
e74ba38
feat(TAP-8714): In MongoDB, abstract parent classes are also used
11000100111010101100111 Dec 10, 2025
6328029
feat(TAP-8714): create offset at first when cdc use debezium
11000100111010101100111 Dec 11, 2025
dc530d6
feat(TAP-8714): In MongoDB, abstract parent classes are also used
11000100111010101100111 Dec 10, 2025
b06cd76
Merge branch 'TAP-8714-auto-set-batch-size-mastr-dev' into TAP-8714-a…
11000100111010101100111 Dec 11, 2025
600bdfa
feat(TAP-8714): Add timeout and over batch settings
11000100111010101100111 Dec 11, 2025
469269c
Merge pull request #668 from tapdata/TAP-8714-auto-set-batch-size-pg-dev
11000100111010101100111 Dec 11, 2025
1775bea
feat(TAP-8714): up api version to 2.0.5
11000100111010101100111 Dec 15, 2025
e3a179b
Merge branch 'develop' into TAP-8714-auto-set-batch-size-mastr-dev
11000100111010101100111 Dec 15, 2025
9fd7f83
fix(TAP-8714): add new method to register cdc consumer
11000100111010101100111 Dec 15, 2025
e949881
fix(TAP-8714): Compatible with old methods before adjustment
11000100111010101100111 Dec 15, 2025
47afea8
Merge branch 'develop' into TAP-8714-auto-set-batch-size-mastr-dev
11000100111010101100111 Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions connectors-common/cdc-core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.tapdata</groupId>
<artifactId>connectors-common</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>cdc-core</artifactId>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<tapdata.pdk.api.verison>2.0.5-SNAPSHOT</tapdata.pdk.api.verison>
</properties>

<dependencies>
<dependency>
<groupId>io.tapdata</groupId>
<artifactId>tapdata-pdk-api</artifactId>
<version>${tapdata.pdk.api.verison}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.tapdata.cdc;

import io.tapdata.entity.event.TapEvent;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;

import java.util.List;

public interface Acceptor<Acc, EType, Consumer extends TapStreamReadConsumer<?, ?>> {

void accept(EType e);

void accept(List<TapEvent> e, Object offset);

Acc setConsumer(Consumer consumer);

Acc setBatchSize(int size);

Acc setBatchSizeTimeout(long ms);

void streamReadStarted();

void streamReadEnded();

default int getBatchSize() {
return 1;
}

Consumer getConsumer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.tapdata.cdc;

import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;

/**
* @author <a href="2749984520@qq.com">Gavin'Xiao</a>
* @author <a href="https://github.com/11000100111010101100111">Gavin'Xiao</a>
* @version v1.0 2025/12/10 15:03 Create
* @description
*/
public abstract class CustomAbstractAccepter<EO, T extends CustomAbstractAccepter<EO, T, Consumer>, Consumer extends TapStreamReadConsumer<?, Object>>
implements Acceptor<T, EO, Consumer> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.tapdata.cdc;

import io.tapdata.entity.event.TapEvent;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;

/**
* @author <a href="2749984520@qq.com">Gavin'Xiao</a>
* @author <a href="https://github.com/11000100111010101100111">Gavin'Xiao</a>
* @version v1.0 2025/12/10 15:00 Create
* @description
*/
public abstract class EventAbstractAccepter<T extends EventAbstractAccepter<T, Consumer>, Consumer extends TapStreamReadConsumer<?, Object>> extends CustomAbstractAccepter<TapEvent, T, Consumer> {
}
1 change: 1 addition & 0 deletions connectors-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<module>js-connector-core-plus</module>
<module>read-partition</module>
<module>hive-core</module>
<module>cdc-core</module>
<!-- <module>dependency-core</module>-->
</modules>
<properties>
Expand Down
7 changes: 6 additions & 1 deletion connectors-common/postgres-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
<debezium.version>1.5.4.Final</debezium.version>
<postgres.core.version>1.0-SNAPSHOT</postgres.core.version>
<tapdata.pdk.api.verison>2.0.1-SNAPSHOT</tapdata.pdk.api.verison>
<tapdata.pdk.api.verison>2.0.5-SNAPSHOT</tapdata.pdk.api.verison>
<fastjson.version>1.2.83</fastjson.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -124,6 +124,11 @@
<artifactId>commons-compress</artifactId>
<version>1.27.1</version>
</dependency>
<dependency>
<groupId>io.tapdata</groupId>
<artifactId>cdc-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.tapdata.connector.postgres.cdc;

import com.google.common.collect.Lists;
import io.tapdata.cdc.CustomAbstractAccepter;
import io.tapdata.common.sqlparser.ResultDO;
import io.tapdata.connector.postgres.PostgresJdbcContext;
import io.tapdata.connector.postgres.cdc.accept.LogMinerProBatchAccepter;
import io.tapdata.connector.postgres.cdc.accept.LogMinerProOneByOneAccepter;
import io.tapdata.connector.postgres.config.PostgresConfig;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
Expand All @@ -18,14 +21,23 @@
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand All @@ -35,8 +47,8 @@ public abstract class AbstractWalLogMiner {

protected final PostgresJdbcContext postgresJdbcContext;
protected final Log tapLogger;
protected StreamReadConsumer consumer;
protected int recordSize;
//protected StreamReadConsumer consumer;
protected CustomAbstractAccepter<NormalRedo, ?, ?> consumer;
protected List<String> tableList;
protected boolean filterSchema;
private Map<String, String> dataTypeMap;
Expand Down Expand Up @@ -95,9 +107,26 @@ public AbstractWalLogMiner withWalLogDirectory(String walLogDirectory) {

public abstract void startMiner(Supplier<Boolean> isAlive) throws Throwable;

/**
* @deprecated
* */
public AbstractWalLogMiner registerConsumer(StreamReadConsumer consumer, int recordSize) {
this.consumer = consumer;
this.recordSize = recordSize;
return registerCdcConsumer(consumer, recordSize);
}

public AbstractWalLogMiner registerCdcConsumer(TapStreamReadConsumer<?, Object> consumer, int recordSize) {
if (consumer instanceof StreamReadConsumer) {
this.consumer = new LogMinerProBatchAccepter()
.setConsumer((StreamReadConsumer) consumer)
.setBatchSize(recordSize)
.setEventCreator(this::createEvent);
} else if (consumer instanceof StreamReadOneByOneConsumer) {
this.consumer = new LogMinerProOneByOneAccepter()
.setConsumer((StreamReadOneByOneConsumer) consumer)
.setEventCreator(this::createEvent);
} else {
throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.tapdata.connector.postgres.PostgresJdbcContext;
import io.tapdata.connector.postgres.cdc.accept.DebeziumBatchAccepter;
import io.tapdata.connector.postgres.cdc.accept.DebeziumOneByOneAccepter;
import io.tapdata.connector.postgres.cdc.accept.PGEventAbstractAccepter;
import io.tapdata.connector.postgres.cdc.config.PostgresDebeziumConfig;
import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
import io.tapdata.connector.postgres.cdc.offset.PostgresOffsetStorage;
import io.tapdata.connector.postgres.config.PostgresConfig;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
import io.tapdata.entity.event.dml.TapInsertRecordEvent;
Expand All @@ -19,14 +20,15 @@
import io.tapdata.entity.logger.TapLogger;
import io.tapdata.entity.schema.TapTable;
import io.tapdata.entity.schema.partition.TapSubPartitionTableInfo;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.entity.utils.DataMap;
import io.tapdata.entity.utils.cache.Entry;
import io.tapdata.entity.utils.cache.Iterator;
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.NumberKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -39,8 +41,16 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -57,8 +67,8 @@ public class PostgresCdcRunner extends DebeziumCdcRunner {
private final TapConnectorContext connectorContext;
private PostgresDebeziumConfig postgresDebeziumConfig;
private PostgresOffset postgresOffset;
private int recordSize;
private StreamReadConsumer consumer;
//private StreamReadConsumer consumer;
protected PGEventAbstractAccepter<?, ?> consumer;
private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
protected TimeZone timeZone;
private String dropTransactionId = null;
Expand Down Expand Up @@ -145,9 +155,27 @@ public AtomicReference<Throwable> getThrowable() {
return throwableAtomicReference;
}

/**
* @deprecated
* */
public void registerConsumer(StreamReadConsumer consumer, int recordSize) {
this.recordSize = recordSize;
this.consumer = consumer;
registerCdcConsumer(consumer, recordSize);
}

public void registerCdcConsumer(TapStreamReadConsumer<?, Object> consumer, int recordSize) {
Supplier<Integer> batchSizeGetter;
if (consumer instanceof StreamReadConsumer) {
this.consumer = new DebeziumBatchAccepter()
.setConsumer((StreamReadConsumer) consumer)
.setBatchSize(recordSize);
batchSizeGetter = () -> recordSize;
} else if (consumer instanceof StreamReadOneByOneConsumer) {
this.consumer = new DebeziumOneByOneAccepter()
.setConsumer((StreamReadOneByOneConsumer) consumer);
batchSizeGetter = () -> ((StreamReadOneByOneConsumer) consumer).getBatchSize();
} else {
throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
}
//build debezium engine
this.engine = (EmbeddedEngine) EmbeddedEngine.create()
.using(postgresDebeziumConfig.create())
Expand All @@ -167,8 +195,12 @@ public void taskStopped() {
// .using(this.getClass().getClassLoader())
// .using(Clock.SYSTEM)
// .notifying(this::consumeRecord)
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
numberOfMessagesSinceLastCommit >= recordSize || timeSinceLastCommit.getSeconds() >= 5)
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> {
int size = Math.min(Math.max(1, batchSizeGetter.get()), 10000);
//超时时间最小1秒,最大10秒
int timeout = Math.min(Math.max(1, size / 100), 5);
return numberOfMessagesSinceLastCommit >= size || timeSinceLastCommit.getSeconds() >= timeout;
})
.notifying(this::consumeRecords).using((result, message, throwable) -> {
if (result) {
if (StringUtils.isNotBlank(message)) {
Expand All @@ -195,7 +227,7 @@ public void taskStopped() {
@Override
public void consumeRecords(List<SourceRecord> sourceRecords, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
super.consumeRecords(sourceRecords, committer);
List<TapEvent> eventList = TapSimplify.list();
//List<TapEvent> eventList = TapSimplify.list();
Map<String, ?> offset = null;
for (SourceRecord sr : sourceRecords) {
try {
Expand All @@ -207,7 +239,8 @@ public void consumeRecords(List<SourceRecord> sourceRecords, DebeziumEngine.Reco
continue;
}
if ("io.debezium.connector.common.Heartbeat".equals(sr.valueSchema().name())) {
eventList.add(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")));
consumer.updateOffset(offset)
.accept(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")));
continue;
} else if (EmptyKit.isNull(sr.valueSchema().field("op"))) {
continue;
Expand Down Expand Up @@ -260,22 +293,12 @@ public void consumeRecords(List<SourceRecord> sourceRecords, DebeziumEngine.Reco
event.setNamespaces(Lists.newArrayList(schema, table));
}
}
eventList.add(event);
if (eventList.size() >= recordSize) {
PostgresOffset postgresOffset = new PostgresOffset();
postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
consumer.accept(eventList, postgresOffset);
eventList = TapSimplify.list();
}
consumer.updateOffset(offset).accept(event);
} catch (StopConnectorException | StopEngineException ex) {
throw ex;
}
}
if (EmptyKit.isNotEmpty(eventList)) {
PostgresOffset postgresOffset = new PostgresOffset();
postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
consumer.accept(eventList, postgresOffset);
}
consumer.updateOffset(offset).accept(null);
}

private DataMap getMapFromStruct(Struct struct) {
Expand Down
Loading
Loading