From 325e3e4254d30260e6ced560de03cb003a41f4e0 Mon Sep 17 00:00:00 2001 From: mnianqi Date: Wed, 4 Feb 2026 16:00:35 +0800 Subject: [PATCH] fix: TAP-9678 StarRocks cache data is incomplete --- .../apis/functions/ConnectorFunctions.java | 10 ++++++++ .../target/WriteRecordCallbackFunction.java | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/connector/target/WriteRecordCallbackFunction.java diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/ConnectorFunctions.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/ConnectorFunctions.java index 2d5f40d..c02e240 100644 --- a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/ConnectorFunctions.java +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/ConnectorFunctions.java @@ -82,6 +82,12 @@ public class ConnectorFunctions extends ConnectionFunctions protected GetStreamOffsetFunction getStreamOffsetFunction; protected ExportEventSqlFunction exportEventSqlFunction; protected FlushOffsetFunction flushOffsetFunction; + protected WriteRecordCallbackFunction writeRecordCallbackFunction; + + public ConnectorFunctions supportWriteRecordCallback(WriteRecordCallbackFunction function) { + writeRecordCallbackFunction = function; + return this; + } public ConnectorFunctions supportFlushOffsetFunction(FlushOffsetFunction function) { flushOffsetFunction = function; @@ -606,4 +612,8 @@ public ExportEventSqlFunction getExportEventSqlFunction() { public FlushOffsetFunction getFlushOffsetFunction() { return flushOffsetFunction; } + + public WriteRecordCallbackFunction getWriteRecordCallbackFunction() { + return writeRecordCallbackFunction; + } } diff --git a/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/connector/target/WriteRecordCallbackFunction.java b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/connector/target/WriteRecordCallbackFunction.java new file mode 100644 index 0000000..569edc8 --- /dev/null +++ b/plugin-kit/tapdata-pdk-api/src/main/java/io/tapdata/pdk/apis/functions/connector/target/WriteRecordCallbackFunction.java @@ -0,0 +1,23 @@ +package io.tapdata.pdk.apis.functions.connector.target; + +import io.tapdata.entity.event.dml.TapRecordEvent; +import io.tapdata.entity.schema.TapTable; +import io.tapdata.pdk.apis.context.TapConnectorContext; +import io.tapdata.pdk.apis.entity.WriteListResult; +import io.tapdata.pdk.apis.functions.connector.TapConnectorFunction; + +import java.util.List; +import java.util.function.Consumer; + +public interface WriteRecordCallbackFunction extends TapConnectorFunction { + + /** + * insert, update, delete events. + * + * @param connectorContext + * @param recordEvents + * @param consumer + */ + void writeRecordCallback(TapConnectorContext connectorContext, Consumer consumer) throws Throwable; + +}