From 4dbe2e09609a4666fb9aa6ae22af50137bfe8ff0 Mon Sep 17 00:00:00 2001 From: mnianqi Date: Thu, 5 Feb 2026 17:36:02 +0800 Subject: [PATCH 1/3] fix: TAP-9678 StarRocks cache data is incomplete --- .../entity/event/TapCallbackOffset.java | 114 ++++++++++++++++++ .../apis/context/TapConnectionContext.java | 9 ++ .../pdk/apis/entity/ConnectionOptions.java | 1 + 3 files changed, 124 insertions(+) create mode 100644 plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java diff --git a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java new file mode 100644 index 00000000..eaeffe97 --- /dev/null +++ b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java @@ -0,0 +1,114 @@ +package io.tapdata.entity.event; + +import java.util.HashMap; + +/** + * TapCallbackOffset - 封装 flush offset callback 需要的数据 + * 用于在数据刷新成功后,将 offset 信息传递给目标节点 + */ +public class TapCallbackOffset extends HashMap { + + private static final long serialVersionUID = 1L; + + // 从 TapRecordEvent 中提取的 offset 信息 + private static final String KEY_BATCH_OFFSET = "batchOffset"; + private static final String KEY_STREAM_OFFSET = "streamOffset"; + private static final String KEY_TABLE_ID = "tableId"; + private static final String KEY_SYNC_STAGE = "syncStage"; + private static final String KEY_SOURCE_TIME = "sourceTime"; + private static final String KEY_EVENT_TIME = "eventTime"; + private static final String KEY_NODE_IDS = "nodeIds"; + + public TapCallbackOffset() { + super(); + } + + /** + * 设置批次偏移量(用于全量同步) + */ + public TapCallbackOffset batchOffset(Object batchOffset) { + if (batchOffset != null) { + put(KEY_BATCH_OFFSET, batchOffset); + } + return this; + } + + /** + * 设置流偏移量(用于增量同步) + */ + public TapCallbackOffset streamOffset(Object streamOffset) { + if (streamOffset != null) { + put(KEY_STREAM_OFFSET, streamOffset); + } + return this; + } + + /** + * 设置表ID + */ + public TapCallbackOffset tableId(String tableId) { + if (tableId != null) { + put(KEY_TABLE_ID, tableId); + } + return this; + } + + /** + * 设置同步阶段(INITIAL_SYNC 或 CDC) + */ + public TapCallbackOffset syncStage(String syncStage) { + if (syncStage != null) { + put(KEY_SYNC_STAGE, syncStage); + } + return this; + } + + /** + * 设置源时间 + */ + public TapCallbackOffset sourceTime(Long sourceTime) { + if (sourceTime != null) { + put(KEY_SOURCE_TIME, sourceTime); + } + return this; + } + + /** + * 设置事件时间 + */ + public TapCallbackOffset eventTime(Long eventTime) { + if (eventTime != null) { + put(KEY_EVENT_TIME, eventTime); + } + return this; + } + + /** + * 设置节点ID列表 + */ + public TapCallbackOffset nodeIds(Object nodeIds) { + if (nodeIds != null) { + put(KEY_NODE_IDS, nodeIds); + } + return this; + } + /** + * 检查是否有有效的 offset 数据 + */ + public boolean hasValidOffset() { + return get(KEY_BATCH_OFFSET) != null || get(KEY_STREAM_OFFSET) != null; + } + + @Override + public String toString() { + return "TapOffset{" + + "batchOffset=" + get(KEY_BATCH_OFFSET) + + ", streamOffset=" + get(KEY_STREAM_OFFSET) + + ", tableId='" + get(KEY_TABLE_ID) + '\'' + + ", syncStage='" + get(KEY_SYNC_STAGE) + '\'' + + ", sourceTime=" + get(KEY_SOURCE_TIME) + + ", eventTime=" + get(KEY_EVENT_TIME) + + ", nodeIds=" + get(KEY_NODE_IDS) + + '}'; + } +} diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java index fec0b470..82000bd6 100644 --- a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java @@ -8,10 +8,12 @@ import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public class TapConnectionContext extends TapContext { protected DataMap connectionConfig; protected DataMap nodeConfig; + protected Consumer flushOffsetCallback; public TapConnectionContext(TapNodeSpecification specification, DataMap connectionConfig, DataMap nodeConfig, Log log) { super(specification); @@ -36,6 +38,13 @@ public void setNodeConfig(DataMap nodeConfig) { this.nodeConfig = nodeConfig; } + public Consumer getFlushOffsetCallback() { + return flushOffsetCallback; + } + + public void setFlushOffsetCallback(Consumer flushOffsetCallback) { + this.flushOffsetCallback = flushOffsetCallback; + } public String toString() { return "TapConnectionContext connectionConfig: " + (connectionConfig != null ? Objects.requireNonNull(InstanceFactory.instance(JsonParser.class)).toJson(connectionConfig) : "") + "nodeConfig: " + (nodeConfig != null ? Objects.requireNonNull(InstanceFactory.instance(JsonParser.class)).toJson(nodeConfig) : "") + " spec: " + specification + " id: " + id; diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java index 536d03d8..6d8549fb 100644 --- a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java @@ -55,6 +55,7 @@ public class ConnectionOptions { public static final String SOURCE_SUPPORT_PARTITION = "source_support_partition"; public static final String TARGET_SUPPORT_PARTITION = "target_support_partition"; + public static final String FLUSH_OFFSET_CALLBACK = "flush_offset_callback"; /** * Instance unique id to identify the same instance among multiple connections. From dd8b0718c0c173e1d9ae3dceb1a7c077262c7bcd Mon Sep 17 00:00:00 2001 From: mnianqi Date: Thu, 5 Feb 2026 18:53:07 +0800 Subject: [PATCH 2/3] fix: TAP-9678 StarRocks cache data is incomplete --- .../main/java/io/tapdata/entity/event/TapCallbackOffset.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java index eaeffe97..31bd0fcb 100644 --- a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java +++ b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java @@ -101,7 +101,7 @@ public boolean hasValidOffset() { @Override public String toString() { - return "TapOffset{" + + return "TapCallbackOffset{" + "batchOffset=" + get(KEY_BATCH_OFFSET) + ", streamOffset=" + get(KEY_STREAM_OFFSET) + ", tableId='" + get(KEY_TABLE_ID) + '\'' + From fd1e256d7361ef991c815935e71f0d88eb6b10a1 Mon Sep 17 00:00:00 2001 From: mnianqi Date: Mon, 9 Feb 2026 09:25:59 +0800 Subject: [PATCH 3/3] fix: TAP-9678 resolve augment code suggestions --- .../io/tapdata/entity/event/TapCallbackOffset.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java index 31bd0fcb..3497ad01 100644 --- a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java +++ b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java @@ -11,13 +11,13 @@ public class TapCallbackOffset extends HashMap { private static final long serialVersionUID = 1L; // 从 TapRecordEvent 中提取的 offset 信息 - private static final String KEY_BATCH_OFFSET = "batchOffset"; - private static final String KEY_STREAM_OFFSET = "streamOffset"; - private static final String KEY_TABLE_ID = "tableId"; - private static final String KEY_SYNC_STAGE = "syncStage"; - private static final String KEY_SOURCE_TIME = "sourceTime"; - private static final String KEY_EVENT_TIME = "eventTime"; - private static final String KEY_NODE_IDS = "nodeIds"; + public static final String KEY_BATCH_OFFSET = "batchOffset"; + public static final String KEY_STREAM_OFFSET = "streamOffset"; + public static final String KEY_TABLE_ID = "tableId"; + public static final String KEY_SYNC_STAGE = "syncStage"; + public static final String KEY_SOURCE_TIME = "sourceTime"; + public static final String KEY_EVENT_TIME = "eventTime"; + public static final String KEY_NODE_IDS = "nodeIds"; public TapCallbackOffset() { super();