From efce12648bf8ccfd7cc539cd7705193d822e4222 Mon Sep 17 00:00:00 2001 From: mnianqi Date: Wed, 4 Feb 2026 16:00:35 +0800 Subject: [PATCH] fix: TAP-9678 StarRocks cache data is incomplete --- connectors/starrocks-connector/pom.xml | 2 +- .../starrocks/StarrocksConnector.java | 7 + .../streamload/StarrocksStreamLoader.java | 154 ++++++++++++++---- 3 files changed, 132 insertions(+), 31 deletions(-) diff --git a/connectors/starrocks-connector/pom.xml b/connectors/starrocks-connector/pom.xml index a353a052a..81dd32a25 100644 --- a/connectors/starrocks-connector/pom.xml +++ b/connectors/starrocks-connector/pom.xml @@ -21,7 +21,7 @@ 31.0.1-jre 1.0-SNAPSHOT - 2.0.0-SNAPSHOT + 2.0.4-SNAPSHOT diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java index 7475028f5..0dcdf777e 100644 --- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java +++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java @@ -161,6 +161,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec connectorFunctions.supportAlterFieldNameFunction(this::fieldDDLHandler); connectorFunctions.supportAlterFieldAttributesFunction(this::fieldDDLHandler); connectorFunctions.supportDropFieldFunction(this::fieldDDLHandler); + connectorFunctions.supportWriteRecordCallback(this::writeRecordCallback); } @@ -197,6 +198,12 @@ private void writeRecord(TapConnectorContext connectorContext, List writeResultConsumer) throws Throwable { + if (checkStreamLoad()) { + getStarrocksStreamLoader().writeRecordCallback(writeResultConsumer); + } + } + protected CreateTableOptions createStarrocksTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent) throws SQLException { TapTable tapTable = createTableEvent.getTable(); CreateTableOptions createTableOptions = new CreateTableOptions(); diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java index 6626e3484..f3f6a5b5e 100644 --- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java +++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java @@ -89,7 +89,6 @@ public class StarrocksStreamLoader { private long lastMemoryCheckTime = 0; private static final long MEMORY_CHECK_INTERVAL = 30000; // 30秒检查一次内存 private final Log taplogger; - private boolean cannotClean; private AtomicReference globalException = new AtomicReference<>(); public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map httpClientMap, boolean useHttps, Log taplogger) { @@ -499,6 +498,28 @@ public void writeRecord(final List tapRecordEvents, final TapTab } } + /** + * 回调方法,用于判断是否有待刷新的缓存数据 + * + * @param writeResultConsumer 消费者,接收一个布尔值: + * - true: 表示有待刷新的缓存数据,不应保存断点 + * - false: 表示没有待刷新的数据,可以安全保存断点 + */ + public void writeRecordCallback(Consumer writeResultConsumer) { + synchronized (writeLock) { + // 检查是否有待刷新的表 + boolean hasPendingData = !pendingFlushTables.isEmpty(); + + if (hasPendingData) { + taplogger.debug("writeRecordCallback: Has pending data to flush, tables: {}", pendingFlushTables); + } else { + taplogger.trace("writeRecordCallback: No pending data, safe to save snapshot"); + } + + writeResultConsumer.accept(hasPendingData); + } + } + public void writeRecord(byte[] record) throws IOException { if (loadBatchFirstRecord) { loadBatchFirstRecord = false; @@ -859,13 +880,12 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.info("Updated last flush time for table {}: {} -> {} (diff: {} ms)", tableName, oldFlushTime, newFlushTime, newFlushTime - oldFlushTime); - // 清理该表的缓存文件 - cleanupCacheFileForTable(tableName); + // 成功:清理该表的缓存文件(删除文件) + cleanupCacheFileForTable(tableName, true); // 从待刷新列表中移除该表 pendingFlushTables.remove(tableName); // 清理该表的批次大小 currentBatchSizeByTable.remove(tableName); - cannotClean = false; // 注意:刷新时间只在成功时更新,失败时不更新以便重试 return respContent; } catch (StarrocksRetryableException e) { @@ -874,7 +894,10 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " + "flush_duration={} ms, error={}", tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage()); - cannotClean = true; + + // 失败:保留缓存文件(不删除),仅关闭文件流 + cleanupCacheFileForTable(tableName, false); + throw e; } catch (Exception e) { long flushEndTime = System.currentTimeMillis(); @@ -882,7 +905,10 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " + "flush_duration={} ms, error={}", tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage()); - cannotClean = true; + + // 失败:保留缓存文件(不删除),仅关闭文件流 + cleanupCacheFileForTable(tableName, false); + throw new StarrocksRuntimeException(e); } finally { @@ -954,17 +980,17 @@ public void shutdown() { // 清理所有Map,释放内存 cacheFileStreamsByTable.clear(); - if (!cannotClean) { - // 清理所有表的缓存文件 - cleanupAllCacheFiles(); - - tempCacheFilesByTable.clear(); - isFirstRecordByTable.clear(); - dataColumnsByTable.clear(); - pendingFlushTables.clear(); - currentBatchSizeByTable.clear(); - lastFlushTimeByTable.clear(); - } + + // 清理所有表的缓存文件 + cleanupAllCacheFiles(); + + tempCacheFilesByTable.clear(); + isFirstRecordByTable.clear(); + dataColumnsByTable.clear(); + pendingFlushTables.clear(); + currentBatchSizeByTable.clear(); + lastFlushTimeByTable.clear(); + // 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存 // 强制垃圾回收 @@ -1044,13 +1070,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException { cacheFileStream = new FileOutputStream(tempCacheFile.toFile(), true); cacheFileStreamsByTable.put(tableName, cacheFileStream); } - if(!cannotClean) { - // 写入批次结束标记 + + // 检查文件是否已经有结束标记 + boolean needsEndMarker = true; + if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) { + // 读取文件最后几个字节,检查是否已经有 ']' + byte[] lastBytes = new byte[10]; + try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) { + long fileSize = Files.size(tempCacheFile); + long skipBytes = Math.max(0, fileSize - 10); + fis.skip(skipBytes); + int bytesRead = fis.read(lastBytes); + String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8); + needsEndMarker = !lastContent.trim().endsWith("]"); + } + } + + // 只在需要时写入结束标记 + if (needsEndMarker) { cacheFileStream.write(messageSerializer.batchEnd()); + taplogger.debug("Added end marker ']' to cache file for table {}", tableName); + } else { + taplogger.debug("Cache file for table {} already has end marker, skipping", tableName); } + cacheFileStream.flush(); cacheFileStream.close(); + // 验证文件完整性 + verifyFileCompleteness(tableName, tempCacheFile); + taplogger.debug("Finalized cache file for table {}: {}, size: {}", tableName, tempCacheFile.toString(), formatBytes(Files.size(tempCacheFile))); } @@ -1060,10 +1109,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException { } } + private void verifyFileCompleteness(String tableName, Path tempCacheFile) { + try { + if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) { + byte[] lastBytes = new byte[100]; + try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) { + long fileSize = Files.size(tempCacheFile); + long skipBytes = Math.max(0, fileSize - 100); + fis.skip(skipBytes); + int bytesRead = fis.read(lastBytes); + String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8); + + if (lastContent.trim().endsWith("]")) { + taplogger.info("File verification passed for table {}: JSON is complete", tableName); + } else { + taplogger.warn("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}", tableName, lastContent); + } + } + } + } catch (Exception e) { + taplogger.warn("Failed to verify file completeness for table {}: {}", tableName, e.getMessage()); + } + } + /** * 清理指定表的缓存文件 + * @param tableName 表名 + * @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件) */ - private void cleanupCacheFileForTable(String tableName) { + private void cleanupCacheFileForTable(String tableName, boolean deleteFile) { try { FileOutputStream cacheFileStream = cacheFileStreamsByTable.get(tableName); Path tempCacheFile = tempCacheFilesByTable.get(tableName); @@ -1081,18 +1155,31 @@ private void cleanupCacheFileForTable(String tableName) { try { long fileSize = Files.size(tempCacheFile); - // 直接删除缓存文件 - Files.deleteIfExists(tempCacheFile); - - taplogger.info("=== File Cleanup Completed ==="); - taplogger.info("Table: {}", tableName); - taplogger.info("Deleted File: {}", tempCacheFile.toString()); - taplogger.info("File Size: {}", formatBytes(fileSize)); - taplogger.info("=============================="); + if (deleteFile) { + // 成功时删除文件 + Files.deleteIfExists(tempCacheFile); + taplogger.info("=== File Cleanup Completed ==="); + taplogger.info("Table: {}", tableName); + taplogger.info("Deleted File: {}", tempCacheFile.toString()); + taplogger.info("File Size: {}", formatBytes(fileSize)); + taplogger.info("=============================="); + } else { + // 失败时保留文件 + taplogger.warn("=== File Preserved for Debugging ==="); + taplogger.warn("Table: {}", tableName); + taplogger.warn("Preserved File: {}", tempCacheFile.toString()); + taplogger.warn("File Size: {}", formatBytes(fileSize)); + taplogger.warn("Reason: Flush failed, file kept for troubleshooting"); + taplogger.warn("====================================="); + } } catch (IOException e) { - taplogger.warn("Failed to delete cache file for table {}: {}", tableName, e.getMessage()); + taplogger.warn("Failed to process cache file for table {}: {}", tableName, e.getMessage()); + } + + // 只有在删除文件时才从 map 中移除 + if (deleteFile) { + tempCacheFilesByTable.remove(tableName); } - tempCacheFilesByTable.remove(tableName); } // 清理相关状态 @@ -1106,6 +1193,13 @@ private void cleanupCacheFileForTable(String tableName) { } } + /** + * 清理指定表的缓存文件(默认删除文件) + */ + private void cleanupCacheFileForTable(String tableName) { + cleanupCacheFileForTable(tableName, true); + } + /** * 清理所有表的缓存文件 */