diff --git a/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java new file mode 100644 index 00000000..3497ad01 --- /dev/null +++ b/plugin-kit/tapdata-api/src/main/java/io/tapdata/entity/event/TapCallbackOffset.java @@ -0,0 +1,114 @@ +package io.tapdata.entity.event; + +import java.util.HashMap; + +/** + * TapCallbackOffset - 封装 flush offset callback 需要的数据 + * 用于在数据刷新成功后,将 offset 信息传递给目标节点 + */ +public class TapCallbackOffset extends HashMap { + + private static final long serialVersionUID = 1L; + + // 从 TapRecordEvent 中提取的 offset 信息 + public static final String KEY_BATCH_OFFSET = "batchOffset"; + public static final String KEY_STREAM_OFFSET = "streamOffset"; + public static final String KEY_TABLE_ID = "tableId"; + public static final String KEY_SYNC_STAGE = "syncStage"; + public static final String KEY_SOURCE_TIME = "sourceTime"; + public static final String KEY_EVENT_TIME = "eventTime"; + public static final String KEY_NODE_IDS = "nodeIds"; + + public TapCallbackOffset() { + super(); + } + + /** + * 设置批次偏移量(用于全量同步) + */ + public TapCallbackOffset batchOffset(Object batchOffset) { + if (batchOffset != null) { + put(KEY_BATCH_OFFSET, batchOffset); + } + return this; + } + + /** + * 设置流偏移量(用于增量同步) + */ + public TapCallbackOffset streamOffset(Object streamOffset) { + if (streamOffset != null) { + put(KEY_STREAM_OFFSET, streamOffset); + } + return this; + } + + /** + * 设置表ID + */ + public TapCallbackOffset tableId(String tableId) { + if (tableId != null) { + put(KEY_TABLE_ID, tableId); + } + return this; + } + + /** + * 设置同步阶段(INITIAL_SYNC 或 CDC) + */ + public TapCallbackOffset syncStage(String syncStage) { + if (syncStage != null) { + put(KEY_SYNC_STAGE, syncStage); + } + return this; + } + + /** + * 设置源时间 + */ + public TapCallbackOffset sourceTime(Long sourceTime) { + if (sourceTime != null) { + put(KEY_SOURCE_TIME, sourceTime); + } + return this; + } + + /** + * 设置事件时间 + */ + public TapCallbackOffset eventTime(Long eventTime) { + if (eventTime != null) { + put(KEY_EVENT_TIME, eventTime); + } + return this; + } + + /** + * 设置节点ID列表 + */ + public TapCallbackOffset nodeIds(Object nodeIds) { + if (nodeIds != null) { + put(KEY_NODE_IDS, nodeIds); + } + return this; + } + /** + * 检查是否有有效的 offset 数据 + */ + public boolean hasValidOffset() { + return get(KEY_BATCH_OFFSET) != null || get(KEY_STREAM_OFFSET) != null; + } + + @Override + public String toString() { + return "TapCallbackOffset{" + + "batchOffset=" + get(KEY_BATCH_OFFSET) + + ", streamOffset=" + get(KEY_STREAM_OFFSET) + + ", tableId='" + get(KEY_TABLE_ID) + '\'' + + ", syncStage='" + get(KEY_SYNC_STAGE) + '\'' + + ", sourceTime=" + get(KEY_SOURCE_TIME) + + ", eventTime=" + get(KEY_EVENT_TIME) + + ", nodeIds=" + get(KEY_NODE_IDS) + + '}'; + } +} diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java index fec0b470..82000bd6 100644 --- a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/context/TapConnectionContext.java @@ -8,10 +8,12 @@ import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public class TapConnectionContext extends TapContext { protected DataMap connectionConfig; protected DataMap nodeConfig; + protected Consumer flushOffsetCallback; public TapConnectionContext(TapNodeSpecification specification, DataMap connectionConfig, DataMap nodeConfig, Log log) { super(specification); @@ -36,6 +38,13 @@ public void setNodeConfig(DataMap nodeConfig) { this.nodeConfig = nodeConfig; } + public Consumer getFlushOffsetCallback() { + return flushOffsetCallback; + } + + public void setFlushOffsetCallback(Consumer flushOffsetCallback) { + this.flushOffsetCallback = flushOffsetCallback; + } public String toString() { return "TapConnectionContext connectionConfig: " + (connectionConfig != null ? Objects.requireNonNull(InstanceFactory.instance(JsonParser.class)).toJson(connectionConfig) : "") + "nodeConfig: " + (nodeConfig != null ? Objects.requireNonNull(InstanceFactory.instance(JsonParser.class)).toJson(nodeConfig) : "") + " spec: " + specification + " id: " + id; diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java index 536d03d8..6d8549fb 100644 --- a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/entity/ConnectionOptions.java @@ -55,6 +55,7 @@ public class ConnectionOptions { public static final String SOURCE_SUPPORT_PARTITION = "source_support_partition"; public static final String TARGET_SUPPORT_PARTITION = "target_support_partition"; + public static final String FLUSH_OFFSET_CALLBACK = "flush_offset_callback"; /** * Instance unique id to identify the same instance among multiple connections.