diff --git a/connectors/starrocks-connector/pom.xml b/connectors/starrocks-connector/pom.xml
index a353a052a..81dd32a25 100644
--- a/connectors/starrocks-connector/pom.xml
+++ b/connectors/starrocks-connector/pom.xml
@@ -21,7 +21,7 @@
31.0.1-jre
1.0-SNAPSHOT
- 2.0.0-SNAPSHOT
+ 2.0.4-SNAPSHOT
diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java
index 7475028f5..0dcdf777e 100644
--- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java
+++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java
@@ -161,6 +161,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportAlterFieldNameFunction(this::fieldDDLHandler);
connectorFunctions.supportAlterFieldAttributesFunction(this::fieldDDLHandler);
connectorFunctions.supportDropFieldFunction(this::fieldDDLHandler);
+ connectorFunctions.supportWriteRecordCallback(this::writeRecordCallback);
}
@@ -197,6 +198,12 @@ private void writeRecord(TapConnectorContext connectorContext, List writeResultConsumer) throws Throwable {
+ if (checkStreamLoad()) {
+ getStarrocksStreamLoader().writeRecordCallback(writeResultConsumer);
+ }
+ }
+
protected CreateTableOptions createStarrocksTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent) throws SQLException {
TapTable tapTable = createTableEvent.getTable();
CreateTableOptions createTableOptions = new CreateTableOptions();
diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java
index 6626e3484..f3f6a5b5e 100644
--- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java
+++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java
@@ -89,7 +89,6 @@ public class StarrocksStreamLoader {
private long lastMemoryCheckTime = 0;
private static final long MEMORY_CHECK_INTERVAL = 30000; // 30秒检查一次内存
private final Log taplogger;
- private boolean cannotClean;
private AtomicReference globalException = new AtomicReference<>();
public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map httpClientMap, boolean useHttps, Log taplogger) {
@@ -499,6 +498,28 @@ public void writeRecord(final List tapRecordEvents, final TapTab
}
}
+ /**
+ * 回调方法,用于判断是否有待刷新的缓存数据
+ *
+ * @param writeResultConsumer 消费者,接收一个布尔值:
+ * - true: 表示有待刷新的缓存数据,不应保存断点
+ * - false: 表示没有待刷新的数据,可以安全保存断点
+ */
+ public void writeRecordCallback(Consumer writeResultConsumer) {
+ synchronized (writeLock) {
+ // 检查是否有待刷新的表
+ boolean hasPendingData = !pendingFlushTables.isEmpty();
+
+ if (hasPendingData) {
+ taplogger.debug("writeRecordCallback: Has pending data to flush, tables: {}", pendingFlushTables);
+ } else {
+ taplogger.trace("writeRecordCallback: No pending data, safe to save snapshot");
+ }
+
+ writeResultConsumer.accept(hasPendingData);
+ }
+ }
+
public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
@@ -859,13 +880,12 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.info("Updated last flush time for table {}: {} -> {} (diff: {} ms)",
tableName, oldFlushTime, newFlushTime, newFlushTime - oldFlushTime);
- // 清理该表的缓存文件
- cleanupCacheFileForTable(tableName);
+ // 成功:清理该表的缓存文件(删除文件)
+ cleanupCacheFileForTable(tableName, true);
// 从待刷新列表中移除该表
pendingFlushTables.remove(tableName);
// 清理该表的批次大小
currentBatchSizeByTable.remove(tableName);
- cannotClean = false;
// 注意:刷新时间只在成功时更新,失败时不更新以便重试
return respContent;
} catch (StarrocksRetryableException e) {
@@ -874,7 +894,10 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
- cannotClean = true;
+
+ // 失败:保留缓存文件(不删除),仅关闭文件流
+ cleanupCacheFileForTable(tableName, false);
+
throw e;
} catch (Exception e) {
long flushEndTime = System.currentTimeMillis();
@@ -882,7 +905,10 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
- cannotClean = true;
+
+ // 失败:保留缓存文件(不删除),仅关闭文件流
+ cleanupCacheFileForTable(tableName, false);
+
throw new StarrocksRuntimeException(e);
} finally {
@@ -954,17 +980,17 @@ public void shutdown() {
// 清理所有Map,释放内存
cacheFileStreamsByTable.clear();
- if (!cannotClean) {
- // 清理所有表的缓存文件
- cleanupAllCacheFiles();
-
- tempCacheFilesByTable.clear();
- isFirstRecordByTable.clear();
- dataColumnsByTable.clear();
- pendingFlushTables.clear();
- currentBatchSizeByTable.clear();
- lastFlushTimeByTable.clear();
- }
+
+ // 清理所有表的缓存文件
+ cleanupAllCacheFiles();
+
+ tempCacheFilesByTable.clear();
+ isFirstRecordByTable.clear();
+ dataColumnsByTable.clear();
+ pendingFlushTables.clear();
+ currentBatchSizeByTable.clear();
+ lastFlushTimeByTable.clear();
+
// 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存
// 强制垃圾回收
@@ -1044,13 +1070,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
cacheFileStream = new FileOutputStream(tempCacheFile.toFile(), true);
cacheFileStreamsByTable.put(tableName, cacheFileStream);
}
- if(!cannotClean) {
- // 写入批次结束标记
+
+ // 检查文件是否已经有结束标记
+ boolean needsEndMarker = true;
+ if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
+ // 读取文件最后几个字节,检查是否已经有 ']'
+ byte[] lastBytes = new byte[10];
+ try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
+ long fileSize = Files.size(tempCacheFile);
+ long skipBytes = Math.max(0, fileSize - 10);
+ fis.skip(skipBytes);
+ int bytesRead = fis.read(lastBytes);
+ String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);
+ needsEndMarker = !lastContent.trim().endsWith("]");
+ }
+ }
+
+ // 只在需要时写入结束标记
+ if (needsEndMarker) {
cacheFileStream.write(messageSerializer.batchEnd());
+ taplogger.debug("Added end marker ']' to cache file for table {}", tableName);
+ } else {
+ taplogger.debug("Cache file for table {} already has end marker, skipping", tableName);
}
+
cacheFileStream.flush();
cacheFileStream.close();
+ // 验证文件完整性
+ verifyFileCompleteness(tableName, tempCacheFile);
+
taplogger.debug("Finalized cache file for table {}: {}, size: {}",
tableName, tempCacheFile.toString(), formatBytes(Files.size(tempCacheFile)));
}
@@ -1060,10 +1109,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
}
}
+ private void verifyFileCompleteness(String tableName, Path tempCacheFile) {
+ try {
+ if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
+ byte[] lastBytes = new byte[100];
+ try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
+ long fileSize = Files.size(tempCacheFile);
+ long skipBytes = Math.max(0, fileSize - 100);
+ fis.skip(skipBytes);
+ int bytesRead = fis.read(lastBytes);
+ String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);
+
+ if (lastContent.trim().endsWith("]")) {
+ taplogger.info("File verification passed for table {}: JSON is complete", tableName);
+ } else {
+ taplogger.warn("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}", tableName, lastContent);
+ }
+ }
+ }
+ } catch (Exception e) {
+ taplogger.warn("Failed to verify file completeness for table {}: {}", tableName, e.getMessage());
+ }
+ }
+
/**
* 清理指定表的缓存文件
+ * @param tableName 表名
+ * @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件)
*/
- private void cleanupCacheFileForTable(String tableName) {
+ private void cleanupCacheFileForTable(String tableName, boolean deleteFile) {
try {
FileOutputStream cacheFileStream = cacheFileStreamsByTable.get(tableName);
Path tempCacheFile = tempCacheFilesByTable.get(tableName);
@@ -1081,18 +1155,31 @@ private void cleanupCacheFileForTable(String tableName) {
try {
long fileSize = Files.size(tempCacheFile);
- // 直接删除缓存文件
- Files.deleteIfExists(tempCacheFile);
-
- taplogger.info("=== File Cleanup Completed ===");
- taplogger.info("Table: {}", tableName);
- taplogger.info("Deleted File: {}", tempCacheFile.toString());
- taplogger.info("File Size: {}", formatBytes(fileSize));
- taplogger.info("==============================");
+ if (deleteFile) {
+ // 成功时删除文件
+ Files.deleteIfExists(tempCacheFile);
+ taplogger.info("=== File Cleanup Completed ===");
+ taplogger.info("Table: {}", tableName);
+ taplogger.info("Deleted File: {}", tempCacheFile.toString());
+ taplogger.info("File Size: {}", formatBytes(fileSize));
+ taplogger.info("==============================");
+ } else {
+ // 失败时保留文件
+ taplogger.warn("=== File Preserved for Debugging ===");
+ taplogger.warn("Table: {}", tableName);
+ taplogger.warn("Preserved File: {}", tempCacheFile.toString());
+ taplogger.warn("File Size: {}", formatBytes(fileSize));
+ taplogger.warn("Reason: Flush failed, file kept for troubleshooting");
+ taplogger.warn("=====================================");
+ }
} catch (IOException e) {
- taplogger.warn("Failed to delete cache file for table {}: {}", tableName, e.getMessage());
+ taplogger.warn("Failed to process cache file for table {}: {}", tableName, e.getMessage());
+ }
+
+ // 只有在删除文件时才从 map 中移除
+ if (deleteFile) {
+ tempCacheFilesByTable.remove(tableName);
}
- tempCacheFilesByTable.remove(tableName);
}
// 清理相关状态
@@ -1106,6 +1193,13 @@ private void cleanupCacheFileForTable(String tableName) {
}
}
+ /**
+ * 清理指定表的缓存文件(默认删除文件)
+ */
+ private void cleanupCacheFileForTable(String tableName) {
+ cleanupCacheFileForTable(tableName, true);
+ }
+
/**
* 清理所有表的缓存文件
*/