Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -883,20 +880,28 @@ private WriteListResult<TapRecordEvent> writeRecordsWithStreamWriteInternal(List
return result;

} catch (Exception e) {
// Check if it's ThreadGroup destroyed error and we can retry
if (retryCount < maxRetries && isThreadGroupDestroyedError(e)) {
connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt " + (retryCount + 1) + "/" + maxRetries + ")");
if (retryCount < maxRetries && (isThreadGroupDestroyedError(e) || isPaimonConflict(e))) {
if (isThreadGroupDestroyedError(e)) {
connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries);
} else if (isPaimonConflict(e)) {
connectorContext.getLog().warn("Commit conflict detected, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e);
}
retryCount++;
// Completely rebuild all resources
reinitCatalog();
// Continue to next iteration for retry
TimeUnit.SECONDS.sleep(1L);
continue;
}

Throwable illegalThreadStateException = CommonUtils.matchThrowable(e, IllegalThreadStateException.class);
if (null != illegalThreadStateException) {
String message = String.format("Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s",
tableName, Thread.currentThread().getName(), Thread.currentThread().getThreadGroup() != null ? Thread.currentThread().getThreadGroup().getName() : "null");
String message = String.format(
"Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s",
tableName,
Thread.currentThread().getName(),
Thread.currentThread().getThreadGroup() != null
? Thread.currentThread().getThreadGroup().getName()
: "null"
);
throw new RuntimeException(message, e);
} else {
throw new RuntimeException("Failed to write records to table " + tableName, e);
Expand All @@ -905,6 +910,28 @@ private WriteListResult<TapRecordEvent> writeRecordsWithStreamWriteInternal(List
}
}

private boolean isPaimonConflict(Throwable e) {
Throwable t = e;
while (t != null) {
String msg = t.getMessage();
if (msg != null) {
if (msg.contains("File deletion conflicts detected")
|| msg.contains("Trying to delete file")
|| msg.contains("noConflictsOrFail")
|| msg.contains("assertNoDelete")) {
return true;
}
}
if (t instanceof IllegalStateException
&& msg != null
&& msg.contains("not previously added")) {
return true;
}
t = t.getCause();
}
return false;
}

/**
* Reinitialize the Paimon catalog.
* This is used to recover from ThreadGroup destroyed errors caused by classloader unloading.
Expand Down
Loading