Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.WriteListResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.hadoop.fs.FileSystem;
import org.apache.paimon.catalog.*;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.hadoop.HadoopFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
Expand All @@ -34,6 +34,7 @@
import org.apache.paimon.types.RowKind;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -255,6 +256,7 @@ private Configuration buildHadoopConfiguration() {
// NoClassDefFoundError due to classloader/version conflicts.
// Ensure S3A filesystem is used when scheme is s3a
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.set("fs.s3a.impl.disable.cache", "true");
conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A");

}
Expand Down Expand Up @@ -796,11 +798,11 @@ private WriteListResult<TapRecordEvent> writeRecordsWithStreamWriteInternal(List
WriteListResult<TapRecordEvent> result = new WriteListResult<>();
Identifier identifier = Identifier.create(database, tableName);

// Get or create cached writer and commit
StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier);
StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier);

try {
// Get or create cached writer and commit
StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier);
StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier);

// Write all records to the writer
for (TapRecordEvent event : recordEvents) {
if (event instanceof TapInsertRecordEvent) {
Expand Down Expand Up @@ -946,11 +948,30 @@ private void cleanupAllResources() {
// Close old catalog if exists
if (catalog != null) {
try {
if (catalog instanceof CachingCatalog) {
CachingCatalog cachingCatalog = (CachingCatalog) catalog;
Catalog wrapped = cachingCatalog.wrapped();
if (wrapped instanceof FileSystemCatalog) {
FileSystemCatalog fileSystemCatalog = (FileSystemCatalog) wrapped;
FileIO fileIO = null;
try {
fileIO = fileSystemCatalog.fileIO();
} catch (Throwable ignore) {
// Ignore fileIO lookup errors
}

// Best-effort close: proactively close FileSystem instances cached by HadoopFileIO
closeHadoopFileIOCachedFileSystems(fileIO);
closeQuietly(fileIO);
}
}

catalog.close();
} catch (Exception e) {
} catch (Throwable e) {
// Ignore close errors
} finally {
catalog = null;
}
catalog = null;
}

// Wait a bit to ensure all internal threads are cleaned up
Expand All @@ -962,6 +983,66 @@ private void cleanupAllResources() {
}
}

private void closeQuietly(Closeable closeable) {
if (closeable == null) {
return;
}
try {
closeable.close();
} catch (Exception ignore) {
// Ignore close errors
}
}

/**
* Best-effort close for cached Hadoop FileSystem instances inside Paimon HadoopFileIO.
* <p>
* HadoopFileIO may cache FileSystem instances (e.g., in a field named "fsMap"). Even if
* Hadoop global FileSystem cache is disabled, this internal cache can still keep an S3A
* FileSystem whose thread factory captured a Task ThreadGroup that will be destroyed later.
*/
private void closeHadoopFileIOCachedFileSystems(Object fileIO) {
if (!(fileIO instanceof HadoopFileIO)) {
return;
}

try {
Field fsMapField = fileIO.getClass().getDeclaredField("fsMap");
fsMapField.setAccessible(true);
Object fsMapObject = fsMapField.get(fileIO);
if (!(fsMapObject instanceof Map)) {
return;
}

Map<?, ?> fsMap = (Map<?, ?>) fsMapObject;
if (fsMap.isEmpty()) {
return;
}

// Copy values first to avoid ConcurrentModificationException in case close triggers internal updates.
List<Object> fileSystems = new ArrayList<>(fsMap.values());
for (Object fs : fileSystems) {
if (fs instanceof FileSystem) {
try {
((FileSystem) fs).close();
} catch (Exception ignore) {
// Ignore close errors
}
}
}

try {
fsMap.clear();
} catch (Exception ignore) {
// Ignore clear errors
}
} catch (NoSuchFieldException ignore) {
// HadoopFileIO implementation differs; ignore.
} catch (Throwable ignore) {
// Best-effort only
}
}

/**
* Check if the exception is caused by ThreadGroup being destroyed.
* This typically happens when the classloader that created Paimon's thread factory
Expand Down
Loading