From 3aaf105df74293625fbb5be3b5890defaaab8f01 Mon Sep 17 00:00:00 2001 From: mnianqi Date: Thu, 5 Feb 2026 17:36:02 +0800 Subject: [PATCH] fix: TAP-9678 StarRocks cache data is incomplete --- connectors/starrocks-connector/pom.xml | 2 +- .../starrocks/StarrocksConnector.java | 8 + .../streamload/StarrocksStreamLoader.java | 191 +++++++++++++++--- .../src/main/resources/spec_starrocks.json | 3 + 4 files changed, 173 insertions(+), 31 deletions(-) diff --git a/connectors/starrocks-connector/pom.xml b/connectors/starrocks-connector/pom.xml index c32e6374c..595f552da 100644 --- a/connectors/starrocks-connector/pom.xml +++ b/connectors/starrocks-connector/pom.xml @@ -21,7 +21,7 @@ 31.0.1-jre 1.0-SNAPSHOT - 2.0.5-SNAPSHOT + 2.0.6-SNAPSHOT diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java index be667cfd1..cc7bd6398 100644 --- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java +++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java @@ -54,6 +54,7 @@ public class StarrocksConnector extends CommonDbConnector { private StarrocksJdbcContext starrocksJdbcContext; private StarrocksConfig starrocksConfig; private final Map starrocksStreamLoaderMap = new ConcurrentHashMap<>(); + private Consumer flushOffsetCallback; @Override @@ -77,6 +78,12 @@ public void onStart(TapConnectionContext tapConnectionContext) { fieldDDLHandlers.register(TapNewFieldEvent.class, this::newField); fieldDDLHandlers.register(TapAlterFieldAttributesEvent.class, this::alterFieldAttr); fieldDDLHandlers.register(TapDropFieldEvent.class, this::dropField); + + // 保存 flush offset callback + this.flushOffsetCallback = tapConnectionContext.getFlushOffsetCallback(); + if (this.flushOffsetCallback != null) { + tapLogger.info("Flush offset callback registered for StarRocks connector"); + } } @@ -178,6 +185,7 @@ public StarrocksStreamLoader getStarrocksStreamLoader() { if (!starrocksStreamLoaderMap.containsKey(threadName)) { StarrocksJdbcContext context = new StarrocksJdbcContext(starrocksConfig); StarrocksStreamLoader StarrocksStreamLoader = new StarrocksStreamLoader(context, new HashMap<>(), starrocksConfig.getUseHTTPS(), tapLogger); + StarrocksStreamLoader.setFlushOffsetCallback(flushOffsetCallback); starrocksStreamLoaderMap.put(threadName, StarrocksStreamLoader); } return starrocksStreamLoaderMap.get(threadName); diff --git a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java index 6626e3484..32a380780 100644 --- a/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java +++ b/connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java @@ -8,6 +8,7 @@ import io.tapdata.connector.starrocks.streamload.exception.StreamLoadException; import io.tapdata.connector.starrocks.streamload.rest.models.RespContent; import io.tapdata.connector.starrocks.util.MinuteWriteLimiter; +import io.tapdata.entity.event.TapCallbackOffset; import io.tapdata.entity.event.dml.TapDeleteRecordEvent; import io.tapdata.entity.event.dml.TapInsertRecordEvent; import io.tapdata.entity.event.dml.TapRecordEvent; @@ -76,6 +77,9 @@ public class StarrocksStreamLoader { // 表名到 TapTable 的映射,用于刷新时获取真正的 TapTable private final Map tableNameToTapTableMap; + // 保存每个表的最后一个 TapOffset,用于在 flush 成功后回调 + private final Map lastTapOffsetByTable; + // 日志打印控制 private long lastLogTime; private static final long LOG_INTERVAL_MS = 30 * 1000; // 30秒 @@ -91,6 +95,8 @@ public class StarrocksStreamLoader { private final Log taplogger; private boolean cannotClean; private AtomicReference globalException = new AtomicReference<>(); + // 回调 flush offset + private Consumer flushOffsetCallback; public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map httpClientMap, boolean useHttps, Log taplogger) { this.StarrocksConfig = (StarrocksConfig) StarrocksJdbcContext.getConfig(); @@ -123,6 +129,7 @@ public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map(); this.pendingFlushTables = ConcurrentHashMap.newKeySet(); this.tableNameToTapTableMap = new ConcurrentHashMap<>(); + this.lastTapOffsetByTable = new ConcurrentHashMap<>(); // 初始化定时刷新 initializeFlushScheduler(); @@ -425,6 +432,32 @@ public void writeRecord(final List tapRecordEvents, final TapTab long batchDataSize = 0; for (TapRecordEvent tapRecordEvent : tapRecordEvents) { + // 构建 TapOffset 对象,用于在 flush 成功后回调 + TapCallbackOffset tapOffset = new TapCallbackOffset(); + + // 从 TapRecordEvent.info 中提取 offset 信息 + // 这些信息由 HazelcastTargetPdkBaseNode.handleTapdataEventDML 方法添加 + Object batchOffset = tapRecordEvent.getInfo("batchOffset"); + Object streamOffset = tapRecordEvent.getInfo("streamOffset"); + Object syncStage = tapRecordEvent.getInfo("syncStage"); + Object sourceTime = tapRecordEvent.getInfo("sourceTime"); + Object nodeIds = tapRecordEvent.getInfo("nodeIds"); + + // 填充 TapOffset + tapOffset.batchOffset(batchOffset) + .streamOffset(streamOffset) + .tableId(tapRecordEvent.getTableId()) + .syncStage(syncStage != null ? syncStage.toString() : null) + .sourceTime(sourceTime instanceof Long ? (Long) sourceTime : null) + .eventTime(tapRecordEvent.getReferenceTime()) + .nodeIds(nodeIds); + + // 保存到 lastTapOffsetByTable,用于 flush 时回调 + // 只有当 offset 有效时才保存(避免覆盖之前的有效 offset) + if (tapOffset.hasValidOffset()) { + lastTapOffsetByTable.put(tableName, tapOffset); + } + byte[] bytes = messageSerializer.serialize(table, tapRecordEvent, isAgg); batchDataSize += bytes.length; @@ -859,14 +892,32 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.info("Updated last flush time for table {}: {} -> {} (diff: {} ms)", tableName, oldFlushTime, newFlushTime, newFlushTime - oldFlushTime); - // 清理该表的缓存文件 - cleanupCacheFileForTable(tableName); + // 成功:清理该表的缓存文件(删除文件) + cleanupCacheFileForTable(tableName, true); // 从待刷新列表中移除该表 pendingFlushTables.remove(tableName); // 清理该表的批次大小 currentBatchSizeByTable.remove(tableName); - cannotClean = false; - // 注意:刷新时间只在成功时更新,失败时不更新以便重试 + + // 数据成功刷新后,主动通知引擎可以保存断点 + taplogger.debug("Table {} successfully flushed and removed from pending list. " + + "Remaining pending tables: {}", tableName, pendingFlushTables.size()); + + // 如果所有表都已刷新完成,并且有 TapOffset 数据,主动通知引擎保存断点 + if (pendingFlushTables.isEmpty() && flushOffsetCallback != null) { + TapCallbackOffset tapOffset = lastTapOffsetByTable.get(tableName); + if (tapOffset != null && tapOffset.hasValidOffset()) { + taplogger.info("All tables flushed successfully, triggering flush offset callback with TapOffset: {}", tapOffset); + try { + flushOffsetCallback.accept(tapOffset); + } catch (Exception e) { + taplogger.warn("Failed to flush offset callback: {}", e.getMessage(), e); + } + } else { + taplogger.debug("No valid TapOffset found for table {}, skipping callback", tableName); + } + } + return respContent; } catch (StarrocksRetryableException e) { long flushEndTime = System.currentTimeMillis(); @@ -874,7 +925,9 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " + "flush_duration={} ms, error={}", tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage()); - cannotClean = true; + + // 失败:保留缓存文件(不删除),仅关闭文件流 + cleanupCacheFileForTable(tableName, false); throw e; } catch (Exception e) { long flushEndTime = System.currentTimeMillis(); @@ -882,7 +935,9 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " + "flush_duration={} ms, error={}", tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage()); - cannotClean = true; + + // 失败:保留缓存文件(不删除),仅关闭文件流 + cleanupCacheFileForTable(tableName, false); throw new StarrocksRuntimeException(e); } finally { @@ -954,17 +1009,16 @@ public void shutdown() { // 清理所有Map,释放内存 cacheFileStreamsByTable.clear(); - if (!cannotClean) { - // 清理所有表的缓存文件 - cleanupAllCacheFiles(); - - tempCacheFilesByTable.clear(); - isFirstRecordByTable.clear(); - dataColumnsByTable.clear(); - pendingFlushTables.clear(); - currentBatchSizeByTable.clear(); - lastFlushTimeByTable.clear(); - } + + // 清理所有表的缓存文件 + cleanupAllCacheFiles(); + + tempCacheFilesByTable.clear(); + isFirstRecordByTable.clear(); + dataColumnsByTable.clear(); + pendingFlushTables.clear(); + currentBatchSizeByTable.clear(); + lastFlushTimeByTable.clear(); // 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存 // 强制垃圾回收 @@ -1044,13 +1098,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException { cacheFileStream = new FileOutputStream(tempCacheFile.toFile(), true); cacheFileStreamsByTable.put(tableName, cacheFileStream); } - if(!cannotClean) { - // 写入批次结束标记 + + // 检查文件是否已经有结束标记 + boolean needsEndMarker = true; + if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) { + // 读取文件最后几个字节,检查是否已经有 ']' + byte[] lastBytes = new byte[10]; + try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) { + long fileSize = Files.size(tempCacheFile); + long skipBytes = Math.max(0, fileSize - 10); + fis.skip(skipBytes); + int bytesRead = fis.read(lastBytes); + String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8); + needsEndMarker = !lastContent.trim().endsWith("]"); + } + } + + // 只在需要时写入结束标记 + if (needsEndMarker) { cacheFileStream.write(messageSerializer.batchEnd()); + taplogger.debug("Added end marker ']' to cache file for table {}", tableName); + } else { + taplogger.debug("Cache file for table {} already has end marker, skipping", tableName); } + cacheFileStream.flush(); cacheFileStream.close(); + // 验证文件完整性 + verifyFileCompleteness(tableName, tempCacheFile); + taplogger.debug("Finalized cache file for table {}: {}, size: {}", tableName, tempCacheFile.toString(), formatBytes(Files.size(tempCacheFile))); } @@ -1060,10 +1137,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException { } } + private void verifyFileCompleteness(String tableName, Path tempCacheFile) { + try { + if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) { + byte[] lastBytes = new byte[100]; + try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) { + long fileSize = Files.size(tempCacheFile); + long skipBytes = Math.max(0, fileSize - 100); + fis.skip(skipBytes); + int bytesRead = fis.read(lastBytes); + String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8); + + if (lastContent.trim().endsWith("]")) { + taplogger.info("File verification passed for table {}: JSON is complete", tableName); + } else { + taplogger.warn("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}", tableName, lastContent); + } + } + } + } catch (Exception e) { + taplogger.warn("Failed to verify file completeness for table {}: {}", tableName, e.getMessage()); + } + } + /** * 清理指定表的缓存文件 + * @param tableName 表名 + * @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件) */ - private void cleanupCacheFileForTable(String tableName) { + private void cleanupCacheFileForTable(String tableName, boolean deleteFile) { try { FileOutputStream cacheFileStream = cacheFileStreamsByTable.get(tableName); Path tempCacheFile = tempCacheFilesByTable.get(tableName); @@ -1081,18 +1183,31 @@ private void cleanupCacheFileForTable(String tableName) { try { long fileSize = Files.size(tempCacheFile); - // 直接删除缓存文件 - Files.deleteIfExists(tempCacheFile); - - taplogger.info("=== File Cleanup Completed ==="); - taplogger.info("Table: {}", tableName); - taplogger.info("Deleted File: {}", tempCacheFile.toString()); - taplogger.info("File Size: {}", formatBytes(fileSize)); - taplogger.info("=============================="); + if (deleteFile) { + // 成功时删除文件 + Files.deleteIfExists(tempCacheFile); + taplogger.info("=== File Cleanup Completed ==="); + taplogger.info("Table: {}", tableName); + taplogger.info("Deleted File: {}", tempCacheFile.toString()); + taplogger.info("File Size: {}", formatBytes(fileSize)); + taplogger.info("=============================="); + } else { + // 失败时保留文件 + taplogger.warn("=== File Preserved for Debugging ==="); + taplogger.warn("Table: {}", tableName); + taplogger.warn("Preserved File: {}", tempCacheFile.toString()); + taplogger.warn("File Size: {}", formatBytes(fileSize)); + taplogger.warn("Reason: Flush failed, file kept for troubleshooting"); + taplogger.warn("====================================="); + } } catch (IOException e) { - taplogger.warn("Failed to delete cache file for table {}: {}", tableName, e.getMessage()); + taplogger.warn("Failed to process cache file for table {}: {}", tableName, e.getMessage()); + } + + // 只有在删除文件时才从 map 中移除 + if (deleteFile) { + tempCacheFilesByTable.remove(tableName); } - tempCacheFilesByTable.remove(tableName); } // 清理相关状态 @@ -1106,6 +1221,13 @@ private void cleanupCacheFileForTable(String tableName) { } } + /** + * 清理指定表的缓存文件(默认删除文件) + */ + private void cleanupCacheFileForTable(String tableName) { + cleanupCacheFileForTable(tableName, true); + } + /** * 清理所有表的缓存文件 */ @@ -1306,4 +1428,13 @@ public WriteListResult createResultList() { return result; } } + + /** + * 设置 flush offset callback + * + * @param flushOffsetCallback 回调函数,在数据成功刷新后调用 + */ + public void setFlushOffsetCallback(Consumer flushOffsetCallback) { + this.flushOffsetCallback = flushOffsetCallback; + } } diff --git a/connectors/starrocks-connector/src/main/resources/spec_starrocks.json b/connectors/starrocks-connector/src/main/resources/spec_starrocks.json index 16ce1b462..95fee1805 100644 --- a/connectors/starrocks-connector/src/main/resources/spec_starrocks.json +++ b/connectors/starrocks-connector/src/main/resources/spec_starrocks.json @@ -15,6 +15,9 @@ "update_on_exists", "just_insert" ] + }, + { + "id": "flush_offset_callback" } ], "connection": {