Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/starrocks-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<guava.version>31.0.1-jre</guava.version>
<mysql.core.version>1.0-SNAPSHOT</mysql.core.version>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.4-SNAPSHOT</tapdata.pdk.api.version>
</properties>
<!--<dependencyManagement>-->
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportAlterFieldNameFunction(this::fieldDDLHandler);
connectorFunctions.supportAlterFieldAttributesFunction(this::fieldDDLHandler);
connectorFunctions.supportDropFieldFunction(this::fieldDDLHandler);
connectorFunctions.supportWriteRecordCallback(this::writeRecordCallback);

}

Expand Down Expand Up @@ -197,6 +198,12 @@ private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEve
}
}

private void writeRecordCallback(TapConnectorContext connectorContext, Consumer<Boolean> writeResultConsumer) throws Throwable {
if (checkStreamLoad()) {
getStarrocksStreamLoader().writeRecordCallback(writeResultConsumer);
}
}

protected CreateTableOptions createStarrocksTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent) throws SQLException {
TapTable tapTable = createTableEvent.getTable();
CreateTableOptions createTableOptions = new CreateTableOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public class StarrocksStreamLoader {
private long lastMemoryCheckTime = 0;
private static final long MEMORY_CHECK_INTERVAL = 30000; // 30秒检查一次内存
private final Log taplogger;
private boolean cannotClean;
private AtomicReference<Exception> globalException = new AtomicReference<>();

public StarrocksStreamLoader(StarrocksJdbcContext StarrocksJdbcContext, Map<String, CloseableHttpClient> httpClientMap, boolean useHttps, Log taplogger) {
Expand Down Expand Up @@ -499,6 +498,28 @@ public void writeRecord(final List<TapRecordEvent> tapRecordEvents, final TapTab
}
}

/**
* 回调方法,用于判断是否有待刷新的缓存数据
*
* @param writeResultConsumer 消费者,接收一个布尔值:
* - true: 表示有待刷新的缓存数据,不应保存断点
* - false: 表示没有待刷新的数据,可以安全保存断点
*/
public void writeRecordCallback(Consumer<Boolean> writeResultConsumer) {
synchronized (writeLock) {
// 检查是否有待刷新的表
boolean hasPendingData = !pendingFlushTables.isEmpty();

if (hasPendingData) {
taplogger.debug("writeRecordCallback: Has pending data to flush, tables: {}", pendingFlushTables);
} else {
taplogger.trace("writeRecordCallback: No pending data, safe to save snapshot");
}

writeResultConsumer.accept(hasPendingData);
}
}

public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
Expand Down Expand Up @@ -859,13 +880,12 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.info("Updated last flush time for table {}: {} -> {} (diff: {} ms)",
tableName, oldFlushTime, newFlushTime, newFlushTime - oldFlushTime);

// 清理该表的缓存文件
cleanupCacheFileForTable(tableName);
// 成功:清理该表的缓存文件(删除文件)
cleanupCacheFileForTable(tableName, true);
// 从待刷新列表中移除该表
pendingFlushTables.remove(tableName);
// 清理该表的批次大小
currentBatchSizeByTable.remove(tableName);
cannotClean = false;
// 注意:刷新时间只在成功时更新,失败时不更新以便重试
return respContent;
} catch (StarrocksRetryableException e) {
Expand All @@ -874,15 +894,21 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
cannotClean = true;

// 失败:保留缓存文件(不删除),仅关闭文件流
cleanupCacheFileForTable(tableName, false);

throw e;
} catch (Exception e) {
long flushEndTime = System.currentTimeMillis();
long flushDuration = flushEndTime - flushStartTime;
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
"flush_duration={} ms, error={}",
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
cannotClean = true;

// 失败:保留缓存文件(不删除),仅关闭文件流
cleanupCacheFileForTable(tableName, false);

throw new StarrocksRuntimeException(e);
} finally {

Expand Down Expand Up @@ -954,17 +980,17 @@ public void shutdown() {

// 清理所有Map,释放内存
cacheFileStreamsByTable.clear();
if (!cannotClean) {
// 清理所有表的缓存文件
cleanupAllCacheFiles();

tempCacheFilesByTable.clear();
isFirstRecordByTable.clear();
dataColumnsByTable.clear();
pendingFlushTables.clear();
currentBatchSizeByTable.clear();
lastFlushTimeByTable.clear();
}

// 清理所有表的缓存文件
cleanupAllCacheFiles();

tempCacheFilesByTable.clear();
isFirstRecordByTable.clear();
dataColumnsByTable.clear();
pendingFlushTables.clear();
currentBatchSizeByTable.clear();
lastFlushTimeByTable.clear();

// 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存

// 强制垃圾回收
Expand Down Expand Up @@ -1044,13 +1070,36 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
cacheFileStream = new FileOutputStream(tempCacheFile.toFile(), true);
cacheFileStreamsByTable.put(tableName, cacheFileStream);
}
if(!cannotClean) {
// 写入批次结束标记

// 检查文件是否已经有结束标记
boolean needsEndMarker = true;
if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
// 读取文件最后几个字节,检查是否已经有 ']'
byte[] lastBytes = new byte[10];
try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
long fileSize = Files.size(tempCacheFile);
long skipBytes = Math.max(0, fileSize - 10);
fis.skip(skipBytes);
int bytesRead = fis.read(lastBytes);
String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);
needsEndMarker = !lastContent.trim().endsWith("]");
}
}

// 只在需要时写入结束标记
if (needsEndMarker) {
cacheFileStream.write(messageSerializer.batchEnd());
taplogger.debug("Added end marker ']' to cache file for table {}", tableName);
} else {
taplogger.debug("Cache file for table {} already has end marker, skipping", tableName);
}

cacheFileStream.flush();
cacheFileStream.close();

// 验证文件完整性
verifyFileCompleteness(tableName, tempCacheFile);

taplogger.debug("Finalized cache file for table {}: {}, size: {}",
tableName, tempCacheFile.toString(), formatBytes(Files.size(tempCacheFile)));
}
Expand All @@ -1060,10 +1109,35 @@ private void finalizeCacheFileForTable(String tableName) throws IOException {
}
}

private void verifyFileCompleteness(String tableName, Path tempCacheFile) {
try {
if (Files.exists(tempCacheFile) && Files.size(tempCacheFile) > 0) {
byte[] lastBytes = new byte[100];
try (FileInputStream fis = new FileInputStream(tempCacheFile.toFile())) {
long fileSize = Files.size(tempCacheFile);
long skipBytes = Math.max(0, fileSize - 100);
fis.skip(skipBytes);
int bytesRead = fis.read(lastBytes);
String lastContent = new String(lastBytes, 0, bytesRead, java.nio.charset.StandardCharsets.UTF_8);

if (lastContent.trim().endsWith("]")) {
taplogger.info("File verification passed for table {}: JSON is complete", tableName);
} else {
taplogger.warn("File verification FAILED for table {}: JSON is incomplete, last 100 chars: {}", tableName, lastContent);
}
}
}
} catch (Exception e) {
taplogger.warn("Failed to verify file completeness for table {}: {}", tableName, e.getMessage());
}
}

/**
* 清理指定表的缓存文件
* @param tableName 表名
* @param deleteFile 是否删除文件(true=删除,false=仅关闭流但保留文件)
*/
private void cleanupCacheFileForTable(String tableName) {
private void cleanupCacheFileForTable(String tableName, boolean deleteFile) {
try {
FileOutputStream cacheFileStream = cacheFileStreamsByTable.get(tableName);
Path tempCacheFile = tempCacheFilesByTable.get(tableName);
Expand All @@ -1081,18 +1155,31 @@ private void cleanupCacheFileForTable(String tableName) {
try {
long fileSize = Files.size(tempCacheFile);

// 直接删除缓存文件
Files.deleteIfExists(tempCacheFile);

taplogger.info("=== File Cleanup Completed ===");
taplogger.info("Table: {}", tableName);
taplogger.info("Deleted File: {}", tempCacheFile.toString());
taplogger.info("File Size: {}", formatBytes(fileSize));
taplogger.info("==============================");
if (deleteFile) {
// 成功时删除文件
Files.deleteIfExists(tempCacheFile);
taplogger.info("=== File Cleanup Completed ===");
taplogger.info("Table: {}", tableName);
taplogger.info("Deleted File: {}", tempCacheFile.toString());
taplogger.info("File Size: {}", formatBytes(fileSize));
taplogger.info("==============================");
} else {
// 失败时保留文件
taplogger.warn("=== File Preserved for Debugging ===");
taplogger.warn("Table: {}", tableName);
taplogger.warn("Preserved File: {}", tempCacheFile.toString());
taplogger.warn("File Size: {}", formatBytes(fileSize));
taplogger.warn("Reason: Flush failed, file kept for troubleshooting");
taplogger.warn("=====================================");
}
} catch (IOException e) {
taplogger.warn("Failed to delete cache file for table {}: {}", tableName, e.getMessage());
taplogger.warn("Failed to process cache file for table {}: {}", tableName, e.getMessage());
}

// 只有在删除文件时才从 map 中移除
if (deleteFile) {
tempCacheFilesByTable.remove(tableName);
}
tempCacheFilesByTable.remove(tableName);
}

// 清理相关状态
Expand All @@ -1106,6 +1193,13 @@ private void cleanupCacheFileForTable(String tableName) {
}
}

/**
* 清理指定表的缓存文件(默认删除文件)
*/
private void cleanupCacheFileForTable(String tableName) {
cleanupCacheFileForTable(tableName, true);
}

/**
* 清理所有表的缓存文件
*/
Expand Down
Loading