From 0143d697f109fd61b975ec83ff335b3dc765e212 Mon Sep 17 00:00:00 2001
From: GavinXiao <2749984520@qq.com>
Date: Mon, 24 Nov 2025 17:28:34 +0800
Subject: [PATCH 1/5] feat(TAP-8714): Batch consumption of events becomes
processing of individual consumption
---
connectors-common/connector-core/pom.xml | 2 +-
connectors-common/hive-core/pom.xml | 2 +-
.../connector/kafka/KafkaSRService.java | 3 +-
.../tapdata/connector/kafka/KafkaService.java | 41 +++----
.../connector/kafka/util/BatchPusher.java | 8 +-
.../java/io/tapdata/common/MqService.java | 5 +-
connectors-common/mysql-core/pom.xml | 2 +-
.../tapdata/connector/mysql/MysqlReader.java | 36 ++----
connectors-common/pom.xml | 4 +-
connectors-common/postgres-core/pom.xml | 2 +-
.../postgres/cdc/AbstractWalLogMiner.java | 8 +-
.../postgres/cdc/PostgresCdcRunner.java | 28 ++---
.../connector/postgres/cdc/WalLogMiner.java | 19 +---
.../connector/postgres/cdc/WalLogMinerV2.java | 19 +---
.../connector/postgres/cdc/WalLogMinerV3.java | 22 +---
.../connector/postgres/cdc/WalPgtoMiner.java | 19 +---
connectors-common/read-partition/pom.xml | 4 +-
.../connector/activemq/ActivemqConnector.java | 8 +-
.../connector/activemq/ActivemqService.java | 29 ++---
.../src/main/resources/spec_activemq.json | 12 +-
.../aliyun-adb-postgres-connector/pom.xml | 2 +-
.../resources/aliyun-adb-postgres-spec.json | 12 +-
connectors/aliyun-mongodb-connector/pom.xml | 2 +-
.../main/resources/aliyun-mongodb-spec.json | 12 +-
.../aliyun-rds-mariadb-connector/pom.xml | 2 +-
.../resources/aliyun-rds-mariadb-spec.json | 12 +-
connectors/aliyun-rds-mysql-connector/pom.xml | 2 +-
.../main/resources/aliyun-rds-mysql-spec.json | 12 +-
.../aliyun-rds-postgres-connector/pom.xml | 2 +-
.../resources/aliyun-rds-postgres-spec.json | 12 +-
connectors/aws-clickhouse-connector/pom.xml | 2 +-
.../src/main/resources/aws_clickhouse.json | 12 +-
connectors/aws-rds-mysql-connector/pom.xml | 2 +-
.../main/resources/aws-rds-mysql-spec.json | 12 +-
connectors/azure-cosmosdb-connector/pom.xml | 2 +-
.../main/resources/azure-cosmosdb-spec.json | 12 +-
connectors/clickhouse-connector/pom.xml | 2 +-
.../src/main/resources/spec_clickhouse.json | 12 +-
connectors/highgo-connector/pom.xml | 2 +-
.../connector/postgres/HighgoConnector.java | 9 +-
.../src/main/resources/spec_highgo.json | 12 +-
connectors/kafka-connector/pom.xml | 2 +-
.../connector/kafka/KafkaConnector.java | 8 +-
.../src/main/resources/spec_kafka.json | 12 +-
connectors/mariadb-connector/pom.xml | 2 +-
.../src/main/resources/spec_mariadb.json | 12 +-
connectors/mongodb-connector/pom.xml | 2 +-
.../io/tapdata/mongodb/MongodbConnector.java | 20 ++--
.../mongodb/reader/MongodbStreamReader.java | 4 +-
.../mongodb/reader/MongodbV4StreamReader.java | 29 +----
.../reader/v3/MongodbV3StreamReader.java | 86 +++++---------
.../src/main/resources/spec.json | 12 +-
.../tapdata/mongodb/MongodbConnectorTest.java | 106 +++++++++---------
connectors/mysql-connector/pom.xml | 2 +-
.../connector/mysql/MysqlConnector.java | 8 +-
.../src/main/resources/mysql-spec.json | 12 +-
connectors/pom.xml | 4 +-
connectors/postgres-connector/pom.xml | 2 +-
.../connector/postgres/PostgresConnector.java | 22 ++--
.../src/main/resources/spec_postgres.json | 12 +-
.../connector/rabbitmq/RabbitmqConnector.java | 8 +-
.../connector/rabbitmq/RabbitmqService.java | 36 +++---
.../src/main/resources/spec_rabbitmq.json | 12 +-
.../connector/rocketmq/RocketmqConnector.java | 8 +-
.../connector/rocketmq/RocketmqService.java | 28 ++---
.../src/main/resources/spec_rocketmq.json | 12 +-
connectors/tidb-connector/pom.xml | 2 +-
.../tapdata/connector/tidb/TidbConnector.java | 6 +-
.../analyse/AnalyseTapEventFromDDLObject.java | 10 +-
.../cdc/process/thread/ProcessHandler.java | 6 +-
.../cdc/process/thread/TapEventManager.java | 8 +-
.../src/main/resources/spec_tidb.json | 12 +-
.../connector/tidb/TidbConnectorTest.java | 12 +-
.../AnalyseTapEventFromDDLObjectTest.java | 8 +-
.../process/thread/TapEventManagerTest.java | 17 ++-
connectors/vastbase-connector/pom.xml | 2 +-
.../connector/postgres/VastbaseConnector.java | 9 +-
.../src/main/resources/spec_vastbase.json | 12 +-
78 files changed, 499 insertions(+), 488 deletions(-)
diff --git a/connectors-common/connector-core/pom.xml b/connectors-common/connector-core/pom.xml
index b04913318..09d07ce95 100644
--- a/connectors-common/connector-core/pom.xml
+++ b/connectors-common/connector-core/pom.xml
@@ -33,7 +33,7 @@
io.tapdata
tapdata-pdk-runner
- 2.0-SNAPSHOT
+ 2.5-SNAPSHOT
test
diff --git a/connectors-common/hive-core/pom.xml b/connectors-common/hive-core/pom.xml
index f64472c12..a98dd637a 100644
--- a/connectors-common/hive-core/pom.xml
+++ b/connectors-common/hive-core/pom.xml
@@ -18,7 +18,7 @@
8
1.0-SNAPSHOT
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaSRService.java b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaSRService.java
index f46690ce0..3455c0619 100644
--- a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaSRService.java
+++ b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaSRService.java
@@ -31,6 +31,7 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import java.io.BufferedReader;
import java.io.IOException;
@@ -313,7 +314,7 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer tableList, int eventBatchSize, BiConsumer, Object> eventsOffsetConsumer) {
+ public void streamConsume(List tableList, StreamReadOneByOneConsumer eventsOffsetConsumer) {
throw new CoreException("The schemaRegister function is not supported as the source for the time being. ");
}
diff --git a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaService.java b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaService.java
index ab1295802..41f9edbed 100644
--- a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaService.java
+++ b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/KafkaService.java
@@ -47,6 +47,7 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import javax.script.Invocable;
import javax.script.ScriptEngine;
@@ -641,13 +642,13 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer consumerRecord : consumerRecords) {
makeMessage(consumerRecord, tableName, list::add);
if (list.size() >= eventBatchSize) {
- syncEventSubmit(list, eventsOffsetConsumer);
+ eventsOffsetConsumer.accept(list, TapSimplify.list());
list = TapSimplify.list();
}
}
}
if (EmptyKit.isNotEmpty(list)) {
- syncEventSubmit(list, eventsOffsetConsumer);
+ eventsOffsetConsumer.accept(list, TapSimplify.list());
}
} catch (Exception e) {
throwable.set(e);
@@ -669,37 +670,27 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer eventList, BiConsumer, Object> eventsOffsetConsumer) {
- eventsOffsetConsumer.accept(eventList, TapSimplify.list());
- }
-
@Override
- public void streamConsume(List tableList, Object offset, int eventBatchSize, BiConsumer, Object> eventsOffsetConsumer) {
+ public void streamConsume(List tableList, Object offset, StreamReadOneByOneConsumer eventsOffsetConsumer) {
consuming.set(true);
int maxDelay = 500;
KafkaConfig kafkaConfig = (KafkaConfig) mqConfig;
ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration((kafkaConfig), connectorId, true);
try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfiguration.build())) {
KafkaOffset streamOffset = KafkaOffsetUtils.setConsumerByOffset(kafkaConsumer, tableList, offset, consuming);
- try (BatchPusher batchPusher = new BatchPusher(
- tapEvents -> eventsOffsetConsumer.accept(tapEvents, streamOffset.clone())
- ).batchSize(eventBatchSize).maxDelay(maxDelay)) {
- // 将初始化的 offset 推送到目标,让指定时间的增量任务下次启动时拿到 offset
- Optional.of(new HeartbeatEvent()).ifPresent(event -> {
- event.setTime(System.currentTimeMillis());
- batchPusher.add(event);
- });
+ // 将初始化的 offset 推送到目标,让指定时间的增量任务下次启动时拿到 offset
+ Optional.of(new HeartbeatEvent()).ifPresent(event -> {
+ event.setTime(System.currentTimeMillis());
+ eventsOffsetConsumer.accept(event, streamOffset.clone());
+ });
- // 消费数据
- while (consuming.get()) {
- ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2L));
- if (consumerRecords.isEmpty()) {
- batchPusher.checkAndSummit();
- } else {
- for (ConsumerRecord consumerRecord : consumerRecords) {
- streamOffset.addTopicOffset(consumerRecord); // 推进 offset
- makeMessage(consumerRecord, consumerRecord.topic(), batchPusher::add);
- }
+ // 消费数据
+ while (consuming.get()) {
+ ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2L));
+ if (!consumerRecords.isEmpty()) {
+ for (ConsumerRecord consumerRecord : consumerRecords) {
+ streamOffset.addTopicOffset(consumerRecord); // 推进 offset
+ makeMessage(consumerRecord, consumerRecord.topic(), e -> eventsOffsetConsumer.accept(e, streamOffset.clone()));
}
}
}
diff --git a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/util/BatchPusher.java b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/util/BatchPusher.java
index cc7646f05..238e9717e 100644
--- a/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/util/BatchPusher.java
+++ b/connectors-common/kafka-core/src/main/java/io/tapdata/connector/kafka/util/BatchPusher.java
@@ -11,7 +11,6 @@
*/
public class BatchPusher implements AutoCloseable {
- private int batchSize = 100;
private int maxDelay = 2000;
private long lastTime;
private final Consumer> submitConsumer;
@@ -23,11 +22,6 @@ public BatchPusher(Consumer> submitConsumer) {
this.submitConsumer = submitConsumer;
}
- public BatchPusher batchSize(int batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
public BatchPusher maxDelay(int maxDelay) {
this.maxDelay = maxDelay;
return this;
@@ -39,7 +33,7 @@ public void add(T record) {
}
public void checkAndSummit() {
- if (batchList.size() >= batchSize || (System.currentTimeMillis() - lastTime > maxDelay && !batchList.isEmpty())) {
+ if (System.currentTimeMillis() - lastTime > maxDelay && !batchList.isEmpty()) {
summit();
}
}
diff --git a/connectors-common/mq-core/src/main/java/io/tapdata/common/MqService.java b/connectors-common/mq-core/src/main/java/io/tapdata/common/MqService.java
index c2aac3144..c2dbd9292 100644
--- a/connectors-common/mq-core/src/main/java/io/tapdata/common/MqService.java
+++ b/connectors-common/mq-core/src/main/java/io/tapdata/common/MqService.java
@@ -8,6 +8,7 @@
import io.tapdata.pdk.apis.entity.TestItem;
import io.tapdata.pdk.apis.entity.WriteListResult;
import io.tapdata.pdk.apis.functions.connection.ConnectionCheckItem;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import java.util.List;
import java.util.function.BiConsumer;
@@ -43,11 +44,11 @@ default void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer tableList, int eventBatchSize, BiConsumer, Object> eventsOffsetConsumer) throws Throwable {
+ default void streamConsume(List tableList, StreamReadOneByOneConsumer eventsOffsetConsumer) throws Throwable {
throw new UnsupportedOperationException();
}
- default void streamConsume(List tableList, Object offset, int eventBatchSize, BiConsumer, Object> eventsOffsetConsumer) {
+ default void streamConsume(List tableList, Object offset, StreamReadOneByOneConsumer eventsOffsetConsumer) {
throw new UnsupportedOperationException();
}
}
diff --git a/connectors-common/mysql-core/pom.xml b/connectors-common/mysql-core/pom.xml
index 82d6621ee..817076814 100644
--- a/connectors-common/mysql-core/pom.xml
+++ b/connectors-common/mysql-core/pom.xml
@@ -33,7 +33,7 @@
8
8.0.33
1.5.4.Final
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.0-SNAPSHOT
diff --git a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
index 14e661a73..7eab47b33 100644
--- a/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
+++ b/connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlReader.java
@@ -43,7 +43,7 @@
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.ErrorKit;
import io.tapdata.kit.StringKit;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
import org.apache.commons.collections4.CollectionUtils;
@@ -88,7 +88,7 @@ public class MysqlReader implements Closeable {
private final Supplier isAlive;
protected final MysqlJdbcContextV2 mysqlJdbcContext;
private EmbeddedEngine embeddedEngine;
- protected StreamReadConsumer streamReadConsumer;
+ protected StreamReadOneByOneConsumer streamReadConsumer;
private LinkedBlockingQueue eventQueue;
private ScheduledExecutorService mysqlSchemaHistoryMonitor;
protected KVReadOnlyMap tapTableMap;
@@ -221,9 +221,8 @@ public void readWithFilter(TapConnectorContext tapConnectorContext, TapTable tap
}
public void readBinlog(TapConnectorContext tapConnectorContext, List tables,
- Object offset, int batchSize, DDLParserType ddlParserType, LinkedBlockingQueue eventQueue, Map extraConfig) throws Throwable {
+ Object offset, DDLParserType ddlParserType, LinkedBlockingQueue eventQueue, Map extraConfig) throws Throwable {
try {
- batchSize = Math.max(batchSize, MIN_BATCH_SIZE);
initDebeziumServerName(tapConnectorContext);
this.tapTableMap = tapConnectorContext.getTableMap();
this.ddlParserType = ddlParserType;
@@ -275,8 +274,8 @@ public void readBinlog(TapConnectorContext tapConnectorContext, List tab
.with("database.history.store.only.monitored.tables.ddl", true)
.with("database.history.store.only.captured.tables.ddl", true)
.with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE)
- .with("max.queue.size", batchSize * 8)
- .with("max.batch.size", batchSize)
+ .with("max.queue.size", mysqlConfig.getMaximumQueueSize())
+ .with("max.batch.size", mysqlConfig.getMaximumQueueSize() / 8)
.with(MySqlConnectorConfig.SERVER_ID, randomServerId())
.with("time.precision.mode", "adaptive_time_microseconds")
// .with("converters", "time")
@@ -369,7 +368,7 @@ public void taskStarted() {
}
public void readBinlog(TapConnectorContext tapConnectorContext, List tables,
- Object offset, int batchSize, DDLParserType ddlParserType, StreamReadConsumer consumer, HashMap contextMapForMasterSlave) throws Throwable {
+ Object offset, DDLParserType ddlParserType, StreamReadOneByOneConsumer consumer, HashMap contextMapForMasterSlave) throws Throwable {
MysqlUtil.buildMasterNode(mysqlConfig, contextMapForMasterSlave);
try {
initDebeziumServerName(tapConnectorContext);
@@ -513,7 +512,7 @@ public void taskStarted() {
}
})
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
- numberOfMessagesSinceLastCommit >= batchSize || timeSinceLastCommit.getSeconds() >= 5)
+ timeSinceLastCommit.getSeconds() >= 5)
.using((result, message, throwable) -> {
tapConnectorContext.configContext();
if (result) {
@@ -672,13 +671,11 @@ private void consumeRecords(List sourceRecords, DebeziumEngine.Rec
}
}
if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
- List tapEvents = new ArrayList<>();
MysqlStreamOffset mysqlStreamOffset = null;
for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
- tapEvents.add(mysqlStreamEvent.getTapEvent());
mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
+ streamReadConsumer.accept(mysqlStreamEvent.getTapEvent(), mysqlStreamOffset);
}
- streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
}
}
@@ -688,27 +685,18 @@ protected void sourceRecordConsumer(SourceRecord record) {
}
if (null == record || null == record.value()) return;
Schema valueSchema = record.valueSchema();
- List mysqlStreamEvents = new ArrayList<>();
if (null != valueSchema.field("op")) {
MysqlStreamEvent mysqlStreamEvent = wrapDML(record);
- Optional.ofNullable(mysqlStreamEvent).ifPresent(mysqlStreamEvents::add);
+ Optional.ofNullable(mysqlStreamEvent).ifPresent(e -> streamReadConsumer.accept(e.getTapEvent(), e.getMysqlStreamOffset()));
} else if (null != valueSchema.field("ddl")) {
- mysqlStreamEvents = wrapDDL(record);
+ List mysqlStreamEvents = wrapDDL(record);
+ mysqlStreamEvents.forEach(e -> streamReadConsumer.accept(e.getTapEvent(), e.getMysqlStreamOffset()));
} else if ("io.debezium.connector.common.Heartbeat".equals(valueSchema.name())) {
Optional.ofNullable((Struct) record.value())
.map(value -> value.getInt64("ts_ms"))
.map(TapSimplify::heartbeatEvent)
.map(heartbeatEvent -> new MysqlStreamEvent(heartbeatEvent, getMysqlStreamOffset(record)))
- .ifPresent(mysqlStreamEvents::add);
- }
- if (CollectionUtils.isNotEmpty(mysqlStreamEvents)) {
- List tapEvents = new ArrayList<>();
- MysqlStreamOffset mysqlStreamOffset = null;
- for (MysqlStreamEvent mysqlStreamEvent : mysqlStreamEvents) {
- tapEvents.add(mysqlStreamEvent.getTapEvent());
- mysqlStreamOffset = mysqlStreamEvent.getMysqlStreamOffset();
- }
- streamReadConsumer.accept(tapEvents, mysqlStreamOffset);
+ .ifPresent(e -> streamReadConsumer.accept(e.getTapEvent(), e.getMysqlStreamOffset()));
}
}
diff --git a/connectors-common/pom.xml b/connectors-common/pom.xml
index fb3509acf..1ef79d453 100644
--- a/connectors-common/pom.xml
+++ b/connectors-common/pom.xml
@@ -28,8 +28,8 @@
${project.artifactId}-v${project.version}
8
2.1-SNAPSHOT
- 2.0.1-SNAPSHOT
- 2.0.1-SNAPSHOT
+ 2.0.5-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.0-SNAPSHOT
5.8.1
1.8.1
diff --git a/connectors-common/postgres-core/pom.xml b/connectors-common/postgres-core/pom.xml
index 24fd0f92b..25033cfde 100644
--- a/connectors-common/postgres-core/pom.xml
+++ b/connectors-common/postgres-core/pom.xml
@@ -21,7 +21,7 @@
1.0-SNAPSHOT
1.5.4.Final
1.0-SNAPSHOT
- 2.0.1-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.2.83
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
index bf81703d5..e48bf14f2 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
@@ -17,7 +17,7 @@
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.StringKit;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import java.math.BigDecimal;
import java.sql.ResultSet;
@@ -35,8 +35,7 @@ public abstract class AbstractWalLogMiner {
protected final PostgresJdbcContext postgresJdbcContext;
protected final Log tapLogger;
- protected StreamReadConsumer consumer;
- protected int recordSize;
+ protected StreamReadOneByOneConsumer consumer;
protected List tableList;
protected boolean filterSchema;
private Map dataTypeMap;
@@ -95,9 +94,8 @@ public AbstractWalLogMiner withWalLogDirectory(String walLogDirectory) {
public abstract void startMiner(Supplier isAlive) throws Throwable;
- public AbstractWalLogMiner registerConsumer(StreamReadConsumer consumer, int recordSize) {
+ public AbstractWalLogMiner registerConsumer(StreamReadOneByOneConsumer consumer) {
this.consumer = consumer;
- this.recordSize = recordSize;
return this;
}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
index f3f51840e..83313e674 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
@@ -26,7 +26,7 @@
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.NumberKit;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -57,8 +57,7 @@ public class PostgresCdcRunner extends DebeziumCdcRunner {
private final TapConnectorContext connectorContext;
private PostgresDebeziumConfig postgresDebeziumConfig;
private PostgresOffset postgresOffset;
- private int recordSize;
- private StreamReadConsumer consumer;
+ private StreamReadOneByOneConsumer consumer;
private final AtomicReference throwableAtomicReference = new AtomicReference<>();
protected TimeZone timeZone;
private String dropTransactionId = null;
@@ -145,8 +144,7 @@ public AtomicReference getThrowable() {
return throwableAtomicReference;
}
- public void registerConsumer(StreamReadConsumer consumer, int recordSize) {
- this.recordSize = recordSize;
+ public void registerConsumer(StreamReadOneByOneConsumer consumer) {
this.consumer = consumer;
//build debezium engine
this.engine = (EmbeddedEngine) EmbeddedEngine.create()
@@ -168,7 +166,7 @@ public void taskStopped() {
// .using(Clock.SYSTEM)
// .notifying(this::consumeRecord)
.using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
- numberOfMessagesSinceLastCommit >= recordSize || timeSinceLastCommit.getSeconds() >= 5)
+ timeSinceLastCommit.getSeconds() >= 5)
.notifying(this::consumeRecords).using((result, message, throwable) -> {
if (result) {
if (StringUtils.isNotBlank(message)) {
@@ -195,7 +193,6 @@ public void taskStopped() {
@Override
public void consumeRecords(List sourceRecords, DebeziumEngine.RecordCommitter committer) throws InterruptedException {
super.consumeRecords(sourceRecords, committer);
- List eventList = TapSimplify.list();
Map offset = null;
for (SourceRecord sr : sourceRecords) {
try {
@@ -207,7 +204,7 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco
continue;
}
if ("io.debezium.connector.common.Heartbeat".equals(sr.valueSchema().name())) {
- eventList.add(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")));
+ consumer.accept(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")), postgresOffset);
continue;
} else if (EmptyKit.isNull(sr.valueSchema().field("op"))) {
continue;
@@ -260,22 +257,13 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco
event.setNamespaces(Lists.newArrayList(schema, table));
}
}
- eventList.add(event);
- if (eventList.size() >= recordSize) {
- PostgresOffset postgresOffset = new PostgresOffset();
- postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
- consumer.accept(eventList, postgresOffset);
- eventList = TapSimplify.list();
- }
+ PostgresOffset postgresOffset = new PostgresOffset();
+ postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
+ consumer.accept(event, postgresOffset);
} catch (StopConnectorException | StopEngineException ex) {
throw ex;
}
}
- if (EmptyKit.isNotEmpty(eventList)) {
- PostgresOffset postgresOffset = new PostgresOffset();
- postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
- consumer.accept(eventList, postgresOffset);
- }
}
private DataMap getMapFromStruct(Struct struct) {
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
index 38d91ffb1..6f69c71df 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
@@ -3,21 +3,16 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.exception.TapPdkOffsetOutOfLogEx;
import io.tapdata.kit.EmptyKit;
import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
public class WalLogMiner extends AbstractWalLogMiner {
@@ -64,23 +59,11 @@ public void startMiner(Supplier isAlive) throws Throwable {
}
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
if (EmptyKit.isNotNull(redo)) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
+ consumer.accept(createEvent(redo), redo.getCdcSequenceStr());
}
} catch (Exception e) {
threadException.set(e);
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
index e467fbd56..faaa286bc 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
@@ -3,7 +3,6 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
@@ -21,8 +20,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
-
public class WalLogMinerV2 extends AbstractWalLogMiner {
private String startLsn;
@@ -46,26 +43,14 @@ public void startMiner(Supplier isAlive) throws Throwable {
ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner");
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
if (EmptyKit.isNotNull(redo)) {
if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
+ consumer.accept(createEvent(redo), redo.getCdcSequenceStr());
} else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
+ consumer.accept(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis()), redo.getCdcSequenceStr());
}
}
} catch (Exception e) {
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
index 605eee1ce..42a987fd2 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
@@ -3,7 +3,6 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
@@ -15,16 +14,11 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
-
public class WalLogMinerV3 extends AbstractWalLogMiner {
private String timestamp;
@@ -42,26 +36,14 @@ public void startMiner(Supplier isAlive) throws Throwable {
ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner");
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
if (EmptyKit.isNotNull(redo)) {
if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
+ consumer.accept(createEvent(redo), redo.getCdcSequenceStr());
} else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
+ consumer.accept(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis()), redo.getCdcSequenceStr());
}
}
} catch (Exception e) {
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
index 1a278bbc6..43fa1d17c 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
@@ -1,12 +1,10 @@
package io.tapdata.connector.postgres.cdc;
import com.alibaba.fastjson.JSONObject;
-import io.tapdata.base.ConnectorBase;
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.common.sqlparser.ResultDO;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
@@ -18,7 +16,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class WalPgtoMiner extends AbstractWalLogMiner {
@@ -65,26 +62,14 @@ public void startMiner(Supplier isAlive) throws Throwable {
try (ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner")) {
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(ConnectorBase.list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
if (EmptyKit.isNotNull(redo)) {
if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
+ consumer.accept(createEvent(redo), redo.getCdcSequenceStr());
} else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (!events.get().isEmpty()) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
+ consumer.accept(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis()), redo.getCdcSequenceStr());
}
}
} catch (Exception e) {
diff --git a/connectors-common/read-partition/pom.xml b/connectors-common/read-partition/pom.xml
index fc153b3bf..671303bad 100644
--- a/connectors-common/read-partition/pom.xml
+++ b/connectors-common/read-partition/pom.xml
@@ -20,13 +20,13 @@
io.tapdata
tapdata-api
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
provided
io.tapdata
tapdata-pdk-api
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
provided
diff --git a/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqConnector.java b/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqConnector.java
index 9a7bc7109..11cf7031c 100644
--- a/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqConnector.java
+++ b/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqConnector.java
@@ -12,7 +12,7 @@
import io.tapdata.entity.schema.value.TapTimeValue;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.ConnectionOptions;
@@ -61,7 +61,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportConnectionCheckFunction(this::checkConnection);
connectorFunctions.supportWriteRecord(this::writeRecord);
connectorFunctions.supportBatchRead(this::batchRead);
- connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamRead);
connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset);
}
@@ -102,8 +102,8 @@ private void batchRead(TapConnectorContext tapConnectorContext, TapTable tapTabl
activemqService.consumeOne(tapTable, eventBatchSize, eventsOffsetConsumer);
}
- private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable {
- activemqService.streamConsume(tableList, recordSize, consumer);
+ private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, StreamReadOneByOneConsumer consumer) throws Throwable {
+ activemqService.streamConsume(tableList, consumer);
}
private Object timestampToStreamOffset(TapConnectorContext connectorContext, Long offsetStartTime) {
diff --git a/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqService.java b/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqService.java
index 98175b618..71581c0eb 100644
--- a/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqService.java
+++ b/connectors/activemq-connector/src/main/java/io/tapdata/connector/activemq/ActivemqService.java
@@ -23,6 +23,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import javax.jms.Queue;
import javax.jms.*;
@@ -221,7 +222,7 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer= eventBatchSize) {
eventsOffsetConsumer.accept(list, TapSimplify.list());
list = TapSimplify.list();
@@ -234,18 +235,17 @@ public void consumeOne(TapTable tapTable, int eventBatchSize, BiConsumer tableList, int eventBatchSize, BiConsumer, Object> eventsOffsetConsumer) throws Throwable {
+ public void streamConsume(List tableList, StreamReadOneByOneConsumer eventsOffsetConsumer) throws Throwable {
consuming.set(true);
Session session = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
List> tablesList = Lists.partition(tableList, (tableList.size() - 1) / concurrency + 1);
executorService = Executors.newFixedThreadPool(tablesList.size());
CountDownLatch countDownLatch = new CountDownLatch(tablesList.size());
tablesList.forEach(tables -> executorService.submit(() -> {
- List list = TapSimplify.list();
Map consumerMap = new HashMap<>();
int err = 0;
while (consuming.get() && err < 10) {
- for (String tableName : tables) {
+ for (final String tableName : tables) {
try {
MessageConsumer messageConsumer = consumerMap.get(tableName);
if (EmptyKit.isNull(messageConsumer)) {
@@ -257,11 +257,7 @@ public void streamConsume(List tableList, int eventBatchSize, BiConsumer
if (EmptyKit.isNull(message)) {
continue;
}
- makeMessage(message, list, tableName);
- if (list.size() >= eventBatchSize) {
- eventsOffsetConsumer.accept(list, TapSimplify.list());
- list = TapSimplify.list();
- }
+ Optional.ofNullable(makeMessage(message, tableName)).ifPresent(e -> eventsOffsetConsumer.accept(e, TapSimplify.list()));
} catch (Exception e) {
TapLogger.error(TAG, "error occur when consume queue: {}", tableName, e);
err++;
@@ -269,9 +265,6 @@ public void streamConsume(List tableList, int eventBatchSize, BiConsumer
}
TapSimplify.sleep(50);
}
- if (EmptyKit.isNotEmpty(list)) {
- eventsOffsetConsumer.accept(list, TapSimplify.list());
- }
consumerMap.forEach((key, value) -> {
try {
value.close();
@@ -293,19 +286,17 @@ public void streamConsume(List tableList, int eventBatchSize, BiConsumer
session.close();
}
- private void makeMessage(Message message, List list, String tableName) throws JMSException {
+ private TapRecordEvent makeMessage(Message message, String tableName) throws JMSException {
TextMessage textMessage = (TextMessage) message;
Map data = jsonParser.fromJson(textMessage.getText(), Map.class);
switch (MqOp.fromValue(textMessage.getStringProperty("mqOp"))) {
case INSERT:
- list.add(new TapInsertRecordEvent().init().table(tableName).after(data).referenceTime(System.currentTimeMillis()));
- break;
+ return new TapInsertRecordEvent().init().table(tableName).after(data).referenceTime(System.currentTimeMillis());
case UPDATE:
- list.add(new TapUpdateRecordEvent().init().table(tableName).after(data).referenceTime(System.currentTimeMillis()));
- break;
+ return new TapUpdateRecordEvent().init().table(tableName).after(data).referenceTime(System.currentTimeMillis());
case DELETE:
- list.add(new TapDeleteRecordEvent().init().table(tableName).before(data).referenceTime(System.currentTimeMillis()));
- break;
+ return new TapDeleteRecordEvent().init().table(tableName).before(data).referenceTime(System.currentTimeMillis());
}
+ return null;
}
}
diff --git a/connectors/activemq-connector/src/main/resources/spec_activemq.json b/connectors/activemq-connector/src/main/resources/spec_activemq.json
index 0b5c39737..eaa2391ad 100644
--- a/connectors/activemq-connector/src/main/resources/spec_activemq.json
+++ b/connectors/activemq-connector/src/main/resources/spec_activemq.json
@@ -4,7 +4,17 @@
"icon": "icons/activemq.png",
"doc": "${doc}",
"id": "activemq",
- "tags": ["Database"]
+ "tags": ["Database"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"connection": {
diff --git a/connectors/aliyun-adb-postgres-connector/pom.xml b/connectors/aliyun-adb-postgres-connector/pom.xml
index fde69c624..6b14c53dc 100644
--- a/connectors/aliyun-adb-postgres-connector/pom.xml
+++ b/connectors/aliyun-adb-postgres-connector/pom.xml
@@ -22,7 +22,7 @@
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-adb-postgres-connector/src/main/resources/aliyun-adb-postgres-spec.json b/connectors/aliyun-adb-postgres-connector/src/main/resources/aliyun-adb-postgres-spec.json
index 4375d3c57..40583a352 100644
--- a/connectors/aliyun-adb-postgres-connector/src/main/resources/aliyun-adb-postgres-spec.json
+++ b/connectors/aliyun-adb-postgres-connector/src/main/resources/aliyun-adb-postgres-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aliyun_adb_postgres.png",
"doc" : "${doc}",
"id": "aliyun-adb-postgres",
- "tags": ["Database"]
+ "tags": ["Database"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities":[
diff --git a/connectors/aliyun-mongodb-connector/pom.xml b/connectors/aliyun-mongodb-connector/pom.xml
index 2527d6187..fd7ba5d06 100644
--- a/connectors/aliyun-mongodb-connector/pom.xml
+++ b/connectors/aliyun-mongodb-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-mongodb-connector/src/main/resources/aliyun-mongodb-spec.json b/connectors/aliyun-mongodb-connector/src/main/resources/aliyun-mongodb-spec.json
index 84831a6a2..d77f2e351 100644
--- a/connectors/aliyun-mongodb-connector/src/main/resources/aliyun-mongodb-spec.json
+++ b/connectors/aliyun-mongodb-connector/src/main/resources/aliyun-mongodb-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aliyun-mongodb.png",
"doc" : "${doc}",
"tags" : ["schema-free","Database"],
- "id": "aliyun-db-mongodb"
+ "id": "aliyun-db-mongodb",
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/aliyun-rds-mariadb-connector/pom.xml b/connectors/aliyun-rds-mariadb-connector/pom.xml
index c55ae7ffa..b1eda00e7 100644
--- a/connectors/aliyun-rds-mariadb-connector/pom.xml
+++ b/connectors/aliyun-rds-mariadb-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-rds-mariadb-connector/src/main/resources/aliyun-rds-mariadb-spec.json b/connectors/aliyun-rds-mariadb-connector/src/main/resources/aliyun-rds-mariadb-spec.json
index 1dbe312ad..c7634d2dd 100644
--- a/connectors/aliyun-rds-mariadb-connector/src/main/resources/aliyun-rds-mariadb-spec.json
+++ b/connectors/aliyun-rds-mariadb-connector/src/main/resources/aliyun-rds-mariadb-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aliyun_rds_mariadb.png",
"id": "aliyun-rds-mariadb",
"doc": "${doc}",
- "tags": ["Database"]
+ "tags": ["Database"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/aliyun-rds-mysql-connector/pom.xml b/connectors/aliyun-rds-mysql-connector/pom.xml
index 899de4eac..b370499f7 100644
--- a/connectors/aliyun-rds-mysql-connector/pom.xml
+++ b/connectors/aliyun-rds-mysql-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-rds-mysql-connector/src/main/resources/aliyun-rds-mysql-spec.json b/connectors/aliyun-rds-mysql-connector/src/main/resources/aliyun-rds-mysql-spec.json
index 40756d30d..960dd8d9a 100644
--- a/connectors/aliyun-rds-mysql-connector/src/main/resources/aliyun-rds-mysql-spec.json
+++ b/connectors/aliyun-rds-mysql-connector/src/main/resources/aliyun-rds-mysql-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aliyun_rds_mysql.png",
"id": "aliyun-rds-mysql",
"doc": "${doc}",
- "tags": ["Database","ssl"]
+ "tags": ["Database","ssl"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/aliyun-rds-postgres-connector/pom.xml b/connectors/aliyun-rds-postgres-connector/pom.xml
index cba9f8240..d2882f881 100644
--- a/connectors/aliyun-rds-postgres-connector/pom.xml
+++ b/connectors/aliyun-rds-postgres-connector/pom.xml
@@ -23,7 +23,7 @@
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-rds-postgres-connector/src/main/resources/aliyun-rds-postgres-spec.json b/connectors/aliyun-rds-postgres-connector/src/main/resources/aliyun-rds-postgres-spec.json
index adb52dd66..91bcd5794 100644
--- a/connectors/aliyun-rds-postgres-connector/src/main/resources/aliyun-rds-postgres-spec.json
+++ b/connectors/aliyun-rds-postgres-connector/src/main/resources/aliyun-rds-postgres-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aliyun_rds_postgres.png",
"doc" : "${doc}",
"id": "aliyun-rds-postgres",
- "tags": ["Database"]
+ "tags": ["Database"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities":[
diff --git a/connectors/aws-clickhouse-connector/pom.xml b/connectors/aws-clickhouse-connector/pom.xml
index c567a9aa9..6993350f6 100644
--- a/connectors/aws-clickhouse-connector/pom.xml
+++ b/connectors/aws-clickhouse-connector/pom.xml
@@ -19,7 +19,7 @@
1.0-SNAPSHOT
3.12.0
31.0.1-jre
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aws-clickhouse-connector/src/main/resources/aws_clickhouse.json b/connectors/aws-clickhouse-connector/src/main/resources/aws_clickhouse.json
index 993a0ef02..e4cdb54d9 100644
--- a/connectors/aws-clickhouse-connector/src/main/resources/aws_clickhouse.json
+++ b/connectors/aws-clickhouse-connector/src/main/resources/aws_clickhouse.json
@@ -6,7 +6,17 @@
"doc": "${doc}",
"tags": [
"Database"
- ]
+ ],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/aws-rds-mysql-connector/pom.xml b/connectors/aws-rds-mysql-connector/pom.xml
index cf779b9a9..6cd87fa61 100644
--- a/connectors/aws-rds-mysql-connector/pom.xml
+++ b/connectors/aws-rds-mysql-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aws-rds-mysql-connector/src/main/resources/aws-rds-mysql-spec.json b/connectors/aws-rds-mysql-connector/src/main/resources/aws-rds-mysql-spec.json
index cbaf71b6e..b16bb26de 100644
--- a/connectors/aws-rds-mysql-connector/src/main/resources/aws-rds-mysql-spec.json
+++ b/connectors/aws-rds-mysql-connector/src/main/resources/aws-rds-mysql-spec.json
@@ -4,7 +4,17 @@
"icon": "icons/aws_rds_mysql.png",
"id": "aws-rds-mysql",
"doc": "${doc}",
- "tags": ["Database"]
+ "tags": ["Database"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/azure-cosmosdb-connector/pom.xml b/connectors/azure-cosmosdb-connector/pom.xml
index e37151837..13125f917 100644
--- a/connectors/azure-cosmosdb-connector/pom.xml
+++ b/connectors/azure-cosmosdb-connector/pom.xml
@@ -19,7 +19,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/azure-cosmosdb-connector/src/main/resources/azure-cosmosdb-spec.json b/connectors/azure-cosmosdb-connector/src/main/resources/azure-cosmosdb-spec.json
index ac3cfa05d..f35cd8c4f 100644
--- a/connectors/azure-cosmosdb-connector/src/main/resources/azure-cosmosdb-spec.json
+++ b/connectors/azure-cosmosdb-connector/src/main/resources/azure-cosmosdb-spec.json
@@ -7,7 +7,17 @@
"schema-free",
"Database"
],
- "id": "azure-cosmosdb"
+ "id": "azure-cosmosdb",
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/clickhouse-connector/pom.xml b/connectors/clickhouse-connector/pom.xml
index 8d9147681..0773fff8a 100644
--- a/connectors/clickhouse-connector/pom.xml
+++ b/connectors/clickhouse-connector/pom.xml
@@ -19,7 +19,7 @@
1.0-SNAPSHOT
3.12.0
31.0.1-jre
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json b/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json
index bac4e3817..0acf2fa36 100644
--- a/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json
+++ b/connectors/clickhouse-connector/src/main/resources/spec_clickhouse.json
@@ -6,7 +6,17 @@
"doc": "${doc}",
"tags": [
"Database"
- ]
+ ],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/highgo-connector/pom.xml b/connectors/highgo-connector/pom.xml
index 6aec28edf..dbbd66358 100644
--- a/connectors/highgo-connector/pom.xml
+++ b/connectors/highgo-connector/pom.xml
@@ -15,7 +15,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.5.4.Final
diff --git a/connectors/highgo-connector/src/main/java/io/tapdata/connector/postgres/HighgoConnector.java b/connectors/highgo-connector/src/main/java/io/tapdata/connector/postgres/HighgoConnector.java
index 3e66d1c4a..fdf367b68 100644
--- a/connectors/highgo-connector/src/main/java/io/tapdata/connector/postgres/HighgoConnector.java
+++ b/connectors/highgo-connector/src/main/java/io/tapdata/connector/postgres/HighgoConnector.java
@@ -20,7 +20,6 @@
import io.tapdata.entity.event.ddl.table.TapNewFieldEvent;
import io.tapdata.entity.event.dml.TapRecordEvent;
import io.tapdata.entity.schema.TapField;
-import io.tapdata.entity.schema.TapIndex;
import io.tapdata.entity.schema.TapTable;
import io.tapdata.entity.schema.type.TapType;
import io.tapdata.entity.schema.value.*;
@@ -34,7 +33,7 @@
import io.tapdata.kit.ErrorKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.ConnectionOptions;
@@ -110,7 +109,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
// source
connectorFunctions.supportBatchCount(this::batchCount);
connectorFunctions.supportBatchRead(this::batchReadWithoutOffset);
- connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamRead);
connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset);
// query
connectorFunctions.supportQueryByFilter(this::queryByFilter);
@@ -396,11 +395,11 @@ protected void writeRecord(TapConnectorContext connectorContext, List tableList, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable {
+ private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, StreamReadOneByOneConsumer consumer) throws Throwable {
cdcRunner = new PostgresCdcRunner(postgresJdbcContext, nodeContext);
testReplicateIdentity(nodeContext.getTableMap());
buildSlot(nodeContext, true);
- cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerConsumer(consumer, recordSize);
+ cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerConsumer(consumer);
cdcRunner.startCdcRunner();
if (EmptyKit.isNotNull(cdcRunner) && EmptyKit.isNotNull(cdcRunner.getThrowable().get())) {
Throwable throwable = ErrorKit.getLastCause(cdcRunner.getThrowable().get());
diff --git a/connectors/highgo-connector/src/main/resources/spec_highgo.json b/connectors/highgo-connector/src/main/resources/spec_highgo.json
index 220e21c8e..9ee445c56 100644
--- a/connectors/highgo-connector/src/main/resources/spec_highgo.json
+++ b/connectors/highgo-connector/src/main/resources/spec_highgo.json
@@ -4,7 +4,17 @@
"icon": "icons/highgo.svg",
"doc" : "${doc}",
"id": "highgo",
- "tags": ["Database", "ssl", "doubleActive"]
+ "tags": ["Database", "ssl", "doubleActive"],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities":[
diff --git a/connectors/kafka-connector/pom.xml b/connectors/kafka-connector/pom.xml
index 9073eaaed..738b0b445 100644
--- a/connectors/kafka-connector/pom.xml
+++ b/connectors/kafka-connector/pom.xml
@@ -12,7 +12,7 @@
kafka-connector
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/kafka-connector/src/main/java/io/tapdata/connector/kafka/KafkaConnector.java b/connectors/kafka-connector/src/main/java/io/tapdata/connector/kafka/KafkaConnector.java
index fdf90c094..986e1cb3f 100644
--- a/connectors/kafka-connector/src/main/java/io/tapdata/connector/kafka/KafkaConnector.java
+++ b/connectors/kafka-connector/src/main/java/io/tapdata/connector/kafka/KafkaConnector.java
@@ -19,7 +19,7 @@
import io.tapdata.entity.schema.value.TapTimeValue;
import io.tapdata.kit.EmptyKit;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.Capability;
@@ -143,7 +143,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportConnectionCheckFunction(this::checkConnection);
connectorFunctions.supportWriteRecord(this::writeRecord);
connectorFunctions.supportBatchRead(this::batchRead);
- connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamRead);
connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset);
connectorFunctions.supportNewFieldFunction(this::fieldDDLHandler);
@@ -310,9 +310,9 @@ private void batchRead(TapConnectorContext tapConnectorContext, TapTable tapTabl
}
}
- private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, StreamReadConsumer consumer) {
+ private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, StreamReadOneByOneConsumer consumer) {
try {
- kafkaService.streamConsume(tableList, offsetState, recordSize, consumer);
+ kafkaService.streamConsume(tableList, offsetState, consumer);
} catch (Throwable e) {
kafkaExceptionCollector.collectTerminateByServer(e);
kafkaExceptionCollector.collectUserPwdInvalid(kafkaConfig.getMqUsername(), e);
diff --git a/connectors/kafka-connector/src/main/resources/spec_kafka.json b/connectors/kafka-connector/src/main/resources/spec_kafka.json
index a57dc1f37..5b938fe48 100644
--- a/connectors/kafka-connector/src/main/resources/spec_kafka.json
+++ b/connectors/kafka-connector/src/main/resources/spec_kafka.json
@@ -7,7 +7,17 @@
"tags": [
"Database",
"schema-free"
- ]
+ ],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"supportDDL": {
diff --git a/connectors/mariadb-connector/pom.xml b/connectors/mariadb-connector/pom.xml
index 0f25688ec..8423db40c 100644
--- a/connectors/mariadb-connector/pom.xml
+++ b/connectors/mariadb-connector/pom.xml
@@ -16,7 +16,7 @@
4.0.3
1.5.4.Final
4.4
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/mariadb-connector/src/main/resources/spec_mariadb.json b/connectors/mariadb-connector/src/main/resources/spec_mariadb.json
index b7aae1fbc..036c250d2 100644
--- a/connectors/mariadb-connector/src/main/resources/spec_mariadb.json
+++ b/connectors/mariadb-connector/src/main/resources/spec_mariadb.json
@@ -6,7 +6,17 @@
"doc": "${doc}",
"tags": [
"Database", "ssl"
- ]
+ ],
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities":[
diff --git a/connectors/mongodb-connector/pom.xml b/connectors/mongodb-connector/pom.xml
index b4fc0b6bb..adf53fca6 100644
--- a/connectors/mongodb-connector/pom.xml
+++ b/connectors/mongodb-connector/pom.xml
@@ -13,7 +13,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
index 9e889f707..15cad69cf 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
@@ -39,7 +39,7 @@
import io.tapdata.mongodb.writer.MongodbWriter;
import io.tapdata.partition.DatabaseReadPartitionSplitter;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.*;
@@ -553,7 +553,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportBatchCount(this::batchCount);
connectorFunctions.supportCreateIndex(this::createIndex);
connectorFunctions.supportCreateTableV2(this::createTableV2);
- connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamRead);
connectorFunctions.supportTimestampToStreamOffset(this::streamOffset);
connectorFunctions.supportErrorHandleFunction(this::errorHandle);
@@ -1619,22 +1619,22 @@ protected Object streamOffset(TapConnectorContext connectorContext, Long offsetS
* @param connectorContext // * @param offset
* // * @param consumer
*/
- protected void streamRead(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
+ protected void streamRead(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) {
int size = tableList.size();
MongoCdcOffset mongoCdcOffset = MongoCdcOffset.fromOffset(offset);
- streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
+ streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), consumer);
if (size == tableList.size() || !tableList.isEmpty()) {
if (mongodbStreamReader == null) {
mongodbStreamReader = createStreamReader();
}
mongodbStreamReader.onStart(mongoConfig);
- doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset(), eventBatchSize, consumer);
+ doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset(), consumer);
}
}
- protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
+ protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) {
try {
- streamReader.read(connectorContext, tableList, offset, eventBatchSize, consumer);
+ streamReader.read(connectorContext, tableList, offset, consumer);
} catch (Exception e) {
exceptionCollector.collectTerminateByServer(e);
exceptionCollector.collectWritePrivileges(e);
@@ -1648,7 +1648,7 @@ protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContex
}
}
- protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
+ protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) {
String database = mongoConfig.getDatabase();
if (StreamWithOpLogCollection.OP_LOG_DB.equals(database) && tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)) {
connectorContext.getLog().info("Start read oplog collection, db: local");
@@ -1658,7 +1658,7 @@ protected void streamReadOpLog(TapConnectorContext connectorContext, List doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset, eventBatchSize, consumer));
+ sourceRunnerFuture = sourceRunner.submit(() -> doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset, consumer));
}
}
}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
index cefd9f403..a6bc9a68b 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
@@ -1,7 +1,7 @@
package io.tapdata.mongodb.reader;
import io.tapdata.mongodb.entity.MongodbConfig;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import java.util.List;
@@ -13,7 +13,7 @@ public interface MongodbStreamReader {
void onStart(MongodbConfig mongodbConfig);
- void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception;
+ void read(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) throws Exception;
Object streamOffset(Long offsetStartTime);
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
index c2425ff21..803c01933 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
@@ -24,7 +24,7 @@
import io.tapdata.mongodb.MongodbUtil;
import io.tapdata.mongodb.entity.MongodbConfig;
import io.tapdata.mongodb.util.MongodbLookupUtil;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.commons.collections4.MapUtils;
import org.bson.*;
@@ -34,7 +34,6 @@
import org.bson.io.ByteBufferBsonInput;
import java.nio.ByteBuffer;
-import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,7 +88,7 @@ public void onStart(MongodbConfig mongodbConfig) {
}
@Override
- public void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception {
+ public void read(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) throws Exception {
openChangeStreamPreAndPostImages(tableList);
if (Boolean.TRUE.equals(mongodbConfig.getDoubleActive())) {
tableList.add("_tap_double_active");
@@ -129,33 +128,11 @@ public void read(TapConnectorContext connectorContext, List tableList, O
AtomicReference throwableAtomicReference = new AtomicReference<>();
try (final MongoChangeStreamCursor> streamCursor = changeStream.cursor()) {
consumeStreamEventThread = new Thread(() -> {
- List events = list();
- OffsetEvent lastOffsetEvent = null;
- long lastSendTime = System.currentTimeMillis();
- // Calculate time window based on batch size
- // If batch size <= 500, use fixed 50ms window
- // If batch size > 500, use dynamic window = batch size / 10 (ms)
- long timeWindowMs = eventBatchSize <= 500 ? 50 : eventBatchSize / 10;
while (running.get()) {
try {
OffsetEvent event = concurrentProcessor.get(10, TimeUnit.MILLISECONDS);
if (EmptyKit.isNotNull(event)) {
- lastOffsetEvent = event;
- events.add(event.getEvent());
- // Check batch size OR time window
- if (events.size() >= eventBatchSize ||
- (System.currentTimeMillis() - lastSendTime > timeWindowMs)) {
- consumer.accept(events, event.getOffset());
- events.clear();
- lastSendTime = System.currentTimeMillis();
- }
- } else {
- // Send remaining events when queue is empty
- if (!events.isEmpty() && lastOffsetEvent != null) {
- consumer.accept(events, lastOffsetEvent.getOffset());
- events.clear();
- lastSendTime = System.currentTimeMillis();
- }
+ consumer.accept(event.getEvent(), event.getOffset());
}
} catch (Exception e) {
throwableAtomicReference.set(e);
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
index 16e7e87d2..643356f8c 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
@@ -9,7 +9,6 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import io.tapdata.entity.event.TapBaseEvent;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.TapLogger;
@@ -21,7 +20,7 @@
import io.tapdata.mongodb.reader.MongodbStreamReader;
import io.tapdata.mongodb.util.IntervalReport;
import io.tapdata.mongodb.util.MongodbLookupUtil;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -65,8 +64,6 @@ public class MongodbV3StreamReader implements MongodbStreamReader {
private ThreadPoolExecutor replicaSetReadThreadPool;
- private LinkedBlockingDeque tapEventQueue;
-
private Exception error;
private KVMap