diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java index 2c47f035..98195610 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java @@ -41,10 +41,7 @@ import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -883,20 +880,28 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List return result; } catch (Exception e) { - // Check if it's ThreadGroup destroyed error and we can retry - if (retryCount < maxRetries && isThreadGroupDestroyedError(e)) { - connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt " + (retryCount + 1) + "/" + maxRetries + ")"); + if (retryCount < maxRetries && (isThreadGroupDestroyedError(e) || isPaimonConflict(e))) { + if (isThreadGroupDestroyedError(e)) { + connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries); + } else if (isPaimonConflict(e)) { + connectorContext.getLog().warn("Commit conflict detected, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e); + } retryCount++; - // Completely rebuild all resources reinitCatalog(); - // Continue to next iteration for retry TimeUnit.SECONDS.sleep(1L); continue; } + Throwable illegalThreadStateException = CommonUtils.matchThrowable(e, IllegalThreadStateException.class); if (null != illegalThreadStateException) { - String message = String.format("Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s", - tableName, Thread.currentThread().getName(), Thread.currentThread().getThreadGroup() != null ? Thread.currentThread().getThreadGroup().getName() : "null"); + String message = String.format( + "Failed to write records to table %s occurred illegal thread state exception, current thread name: %s, thread group: %s", + tableName, + Thread.currentThread().getName(), + Thread.currentThread().getThreadGroup() != null + ? Thread.currentThread().getThreadGroup().getName() + : "null" + ); throw new RuntimeException(message, e); } else { throw new RuntimeException("Failed to write records to table " + tableName, e); @@ -905,6 +910,28 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List } } + private boolean isPaimonConflict(Throwable e) { + Throwable t = e; + while (t != null) { + String msg = t.getMessage(); + if (msg != null) { + if (msg.contains("File deletion conflicts detected") + || msg.contains("Trying to delete file") + || msg.contains("noConflictsOrFail") + || msg.contains("assertNoDelete")) { + return true; + } + } + if (t instanceof IllegalStateException + && msg != null + && msg.contains("not previously added")) { + return true; + } + t = t.getCause(); + } + return false; + } + /** * Reinitialize the Paimon catalog. * This is used to recover from ThreadGroup destroyed errors caused by classloader unloading.