Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/starrocks-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<guava.version>31.0.1-jre</guava.version>
<mysql.core.version>1.0-SNAPSHOT</mysql.core.version>
<tapdata.pdk.api.version>2.0.5-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.6-SNAPSHOT</tapdata.pdk.api.version>
</properties>
<!--<dependencyManagement>-->
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class StarrocksConnector extends CommonDbConnector {
private StarrocksJdbcContext starrocksJdbcContext;
private StarrocksConfig starrocksConfig;
private final Map<String, StarrocksStreamLoader> starrocksStreamLoaderMap = new ConcurrentHashMap<>();
private Consumer<Object> flushOffsetCallback;


@Override
Expand All @@ -77,6 +78,12 @@ public void onStart(TapConnectionContext tapConnectionContext) {
fieldDDLHandlers.register(TapNewFieldEvent.class, this::newField);
fieldDDLHandlers.register(TapAlterFieldAttributesEvent.class, this::alterFieldAttr);
fieldDDLHandlers.register(TapDropFieldEvent.class, this::dropField);

// 保存 flush offset callback
this.flushOffsetCallback = tapConnectionContext.getFlushOffsetCallback();
if (this.flushOffsetCallback != null) {
tapLogger.info("Flush offset callback registered for StarRocks connector");
}
}


Expand Down Expand Up @@ -178,6 +185,7 @@ public StarrocksStreamLoader getStarrocksStreamLoader() {
if (!starrocksStreamLoaderMap.containsKey(threadName)) {
StarrocksJdbcContext context = new StarrocksJdbcContext(starrocksConfig);
StarrocksStreamLoader StarrocksStreamLoader = new StarrocksStreamLoader(context, new HashMap<>(), starrocksConfig.getUseHTTPS(), tapLogger);
StarrocksStreamLoader.setFlushOffsetCallback(flushOffsetCallback);
starrocksStreamLoaderMap.put(threadName, StarrocksStreamLoader);
}
return starrocksStreamLoaderMap.get(threadName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.tapdata.connector.starrocks.streamload.exception.StreamLoadException;
import io.tapdata.connector.starrocks.streamload.rest.models.RespContent;
import io.tapdata.connector.starrocks.util.MinuteWriteLimiter;
import io.tapdata.entity.event.TapCallbackOffset;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
import io.tapdata.entity.event.dml.TapInsertRecordEvent;
import io.tapdata.entity.event.dml.TapRecordEvent;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class StarrocksStreamLoader {
// 表名到 TapTable 的映射,用于刷新时获取真正的 TapTable
private final Map<String, TapTable> tableNameToTapTableMap;

// 保存每个表的最后一个 TapOffset,用于在 flush 成功后回调
private final Map<String, TapCallbackOffset> lastTapOffsetByTable;

// 日志打印控制
private long lastLogTime;
private static final long LOG_INTERVAL_MS = 30 * 1000; // 30秒
Expand All @@ -91,6 +95,8 @@ public class StarrocksStreamLoader {
private final Log taplogger;
private boolean cannotClean;
private AtomicReference<Exception> globalException = new AtomicReference<>();
// 回调 flush offset
private Consumer<Object> flushOffsetCallback;

public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map<String, CloseableHttpClient> httpClientMap, boolean useHttps, Log taplogger) {
this.StarrocksConfig = (StarrocksConfig) StarrocksJdbcContext.getConfig();
Expand Down Expand Up @@ -123,6 +129,7 @@ public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map<Stri
this.isFirstRecordByTable = new ConcurrentHashMap<>();
this.pendingFlushTables = ConcurrentHashMap.newKeySet();
this.tableNameToTapTableMap = new ConcurrentHashMap<>();
this.lastTapOffsetByTable = new ConcurrentHashMap<>();

// 初始化定时刷新
initializeFlushScheduler();
Expand Down Expand Up @@ -425,6 +432,32 @@ public void writeRecord(final List<TapRecordEvent> tapRecordEvents, final TapTab
long batchDataSize = 0;

for (TapRecordEvent tapRecordEvent : tapRecordEvents) {
// 构建 TapOffset 对象,用于在 flush 成功后回调
TapCallbackOffset tapOffset = new TapCallbackOffset();

// 从 TapRecordEvent.info 中提取 offset 信息
// 这些信息由 HazelcastTargetPdkBaseNode.handleTapdataEventDML 方法添加
Object batchOffset = tapRecordEvent.getInfo("batchOffset");
Object streamOffset = tapRecordEvent.getInfo("streamOffset");
Object syncStage = tapRecordEvent.getInfo("syncStage");
Object sourceTime = tapRecordEvent.getInfo("sourceTime");
Object nodeIds = tapRecordEvent.getInfo("nodeIds");

// 填充 TapOffset
tapOffset.batchOffset(batchOffset)
.streamOffset(streamOffset)
.tableId(tapRecordEvent.getTableId())
.syncStage(syncStage != null ? syncStage.toString() : null)
.sourceTime(sourceTime instanceof Long ? (Long) sourceTime : null)
.eventTime(tapRecordEvent.getReferenceTime())
.nodeIds(nodeIds);

// 保存到 lastTapOffsetByTable,用于 flush 时回调
// 只有当 offset 有效时才保存(避免覆盖之前的有效 offset)
if (tapOffset.hasValidOffset()) {
lastTapOffsetByTable.put(tableName, tapOffset);
}

byte[] bytes = messageSerializer.serialize(table, tapRecordEvent, isAgg);
batchDataSize += bytes.length;

Expand Down Expand Up @@ -859,30 +892,52 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.info("Updated last flush time for table {}: {} -> {} (diff: {} ms)",
tableName, oldFlushTime, newFlushTime, newFlushTime - oldFlushTime);

// 清理该表的缓存文件
cleanupCacheFileForTable(tableName);
// 成功:清理该表的缓存文件(删除文件)
cleanupCacheFileForTable(tableName, true);
// 从待刷新列表中移除该表
pendingFlushTables.remove(tableName);
// 清理该表的批次大小
currentBatchSizeByTable.remove(tableName);
cannotClean = false;
// 注意:刷新时间只在成功时更新,失败时不更新以便重试

// 数据成功刷新后,主动通知引擎可以保存断点
taplogger.debug("Table {} successfully flushed and removed from pending list. " +
"Remaining pending tables: {}", tableName, pendingFlushTables.size());

// 如果所有表都已刷新完成,并且有 TapOffset 数据,主动通知引擎保存断点
if (pendingFlushTables.isEmpty() && flushOffsetCallback != null) {
TapCallbackOffset tapOffset = lastTapOffsetByTable.get(tableName);
if (tapOffset != null && tapOffset.hasValidOffset()) {
taplogger.info("All tables flushed successfully, triggering flush offset callback with TapOffset: {}", tapOffset);
try {
flushOffsetCallback.accept(tapOffset);
} catch (Exception e) {
taplogger.warn("Failed to flush offset callback: {}", e.getMessage(), e);
}
} else {
taplogger.debug("No valid TapOffset found for table {}, skipping callback", tableName);
}
}

return respContent;
} catch (StarrocksRetryableException e) {
long flushEndTime = System.currentTimeMillis();
long flushDuration = flushEndTime - flushStartTime;
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
cannotClean = true;

// 失败:保留缓存文件(不删除),仅关闭文件流
cleanupCacheFileForTable(tableName, false);
throw e;
} catch (Exception e) {
long flushEndTime = System.currentTimeMillis();
long flushDuration = flushEndTime - flushStartTime;
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
cannotClean = true;

// 失败:保留缓存文件(不删除),仅关闭文件流
cleanupCacheFileForTable(tableName, false);
throw new StarrocksRuntimeException(e);
} finally {

Expand Down Expand Up @@ -954,17 +1009,16 @@ public void shutdown() {

// 清理所有Map,释放内存
cacheFileStreamsByTable.clear();
if (!cannotClean) {
// 清理所有表的缓存文件
cleanupAllCacheFiles();

tempCacheFilesByTable.clear();
isFirstRecordByTable.clear();
dataColumnsByTable.clear();
pendingFlushTables.clear();
currentBatchSizeByTable.clear();
lastFlushTimeByTable.clear();
}

// 清理所有表的缓存文件
cleanupAllCacheFiles();

tempCacheFilesByTable.clear();
isFirstRecordByTable.clear();
dataColumnsByTable.clear();
pendingFlushTables.clear();
currentBatchSizeByTable.clear();
lastFlushTimeByTable.clear();
// 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存

// 强制垃圾回收
Expand Down Expand Up @@ -1044,13 +1098,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
cacheFileStream = new FileOutputStream(tempCacheFile.toFile(), true);
cacheFileStreamsByTable.put(tableName, cacheFileStream);
}
if(!cannotClean) {
// 写入批次结束标记

// 检查文件是否已经有结束标记
boolean needsEndMarker = true;
if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
// 读取文件最后几个字节,检查是否已经有 ']'
byte[] lastBytes = new byte[10];
try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
long fileSize = Files.size(tempCacheFile);
long skipBytes = Math.max(0, fileSize - 10);
fis.skip(skipBytes);
int bytesRead = fis.read(lastBytes);
String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);
needsEndMarker = !lastContent.trim().endsWith("]");
}
}

// 只在需要时写入结束标记
if (needsEndMarker) {
cacheFileStream.write(messageSerializer.batchEnd());
taplogger.debug("Added end marker ']' to cache file for table {}", tableName);
} else {
taplogger.debug("Cache file for table {} already has end marker, skipping", tableName);
}

cacheFileStream.flush();
cacheFileStream.close();

// 验证文件完整性
verifyFileCompleteness(tableName, tempCacheFile);

taplogger.debug("Finalized cache file for table {}: {}, size: {}",
tableName, tempCacheFile.toString(), formatBytes(Files.size(tempCacheFile)));
}
Expand All @@ -1060,10 +1137,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
}
}

private void verifyFileCompleteness(String tableName, Path tempCacheFile) {
try {
if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
byte[] lastBytes = new byte[100];
try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
long fileSize = Files.size(tempCacheFile);
long skipBytes = Math.max(0, fileSize - 100);
fis.skip(skipBytes);
int bytesRead = fis.read(lastBytes);
String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);

if (lastContent.trim().endsWith("]")) {
taplogger.info("File verification passed for table {}: JSON is complete", tableName);
} else {
taplogger.warn("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}", tableName, lastContent);
}
}
}
} catch (Exception e) {
taplogger.warn("Failed to verify file completeness for table {}: {}", tableName, e.getMessage());
}
}

/**
* 清理指定表的缓存文件
* @param tableName 表名
* @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件)
*/
private void cleanupCacheFileForTable(String tableName) {
private void cleanupCacheFileForTable(String tableName, boolean deleteFile) {
try {
FileOutputStream cacheFileStream = cacheFileStreamsByTable.get(tableName);
Path tempCacheFile = tempCacheFilesByTable.get(tableName);
Expand All @@ -1081,18 +1183,31 @@ private void cleanupCacheFileForTable(String tableName) {
try {
long fileSize = Files.size(tempCacheFile);

// 直接删除缓存文件
Files.deleteIfExists(tempCacheFile);

taplogger.info("=== File Cleanup Completed ===");
taplogger.info("Table: {}", tableName);
taplogger.info("Deleted File: {}", tempCacheFile.toString());
taplogger.info("File Size: {}", formatBytes(fileSize));
taplogger.info("==============================");
if (deleteFile) {
// 成功时删除文件
Files.deleteIfExists(tempCacheFile);
taplogger.info("=== File Cleanup Completed ===");
taplogger.info("Table: {}", tableName);
taplogger.info("Deleted File: {}", tempCacheFile.toString());
taplogger.info("File Size: {}", formatBytes(fileSize));
taplogger.info("==============================");
} else {
// 失败时保留文件
taplogger.warn("=== File Preserved for Debugging ===");
taplogger.warn("Table: {}", tableName);
taplogger.warn("Preserved File: {}", tempCacheFile.toString());
taplogger.warn("File Size: {}", formatBytes(fileSize));
taplogger.warn("Reason: Flush failed, file kept for troubleshooting");
taplogger.warn("=====================================");
}
} catch (IOException e) {
taplogger.warn("Failed to delete cache file for table {}: {}", tableName, e.getMessage());
taplogger.warn("Failed to process cache file for table {}: {}", tableName, e.getMessage());
}

// 只有在删除文件时才从 map 中移除
if (deleteFile) {
tempCacheFilesByTable.remove(tableName);
}
tempCacheFilesByTable.remove(tableName);
}

// 清理相关状态
Expand All @@ -1106,6 +1221,13 @@ private void cleanupCacheFileForTable(String tableName) {
}
}

/**
* 清理指定表的缓存文件(默认删除文件)
*/
private void cleanupCacheFileForTable(String tableName) {
cleanupCacheFileForTable(tableName, true);
}

/**
* 清理所有表的缓存文件
*/
Expand Down Expand Up @@ -1306,4 +1428,13 @@ public WriteListResult<TapRecordEvent> createResultList() {
return result;
}
}

/**
* 设置 flush offset callback
*
* @param flushOffsetCallback 回调函数,在数据成功刷新后调用
*/
public void setFlushOffsetCallback(Consumer<Object> flushOffsetCallback) {
this.flushOffsetCallback = flushOffsetCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"update_on_exists",
"just_insert"
]
},
{
"id": "flush_offset_callback"
}
],
"connection": {
Expand Down
Loading