From 503a551ae1b081052f686c213abe4a887f0df2de Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Mon, 23 Feb 2026 01:42:05 +0800 Subject: [PATCH] Fix resource leak in KafkaIO GCS truststore file download Convert manual resource close to try-with-resources in identityOrGcsToLocalFile to prevent leaking ReadableByteChannel, FileOutputStream, and WritableByteChannel when an IOException occurs during the copy loop. Also preserve the original IOException as the cause of the IllegalArgumentException. --- .../KafkaReadSchemaTransformProvider.java | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 74f9b147bbd6..c5764b39bc69 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -402,31 +402,24 @@ private static Object identityOrGcsToLocalFile(Object configValue) { LOG.info( "Downloading {} into local filesystem ({})", configStr, localFile.toAbsolutePath()); // TODO(pabloem): Only copy if file does not exist. - ReadableByteChannel channel = - FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId()); - FileOutputStream outputStream = new FileOutputStream(localFile.toFile()); - - // Create a WritableByteChannel to write data to the FileOutputStream - WritableByteChannel outputChannel = Channels.newChannel(outputStream); - - // Read data from the ReadableByteChannel and write it to the WritableByteChannel - ByteBuffer buffer = ByteBuffer.allocate(1024); - while (channel.read(buffer) != -1) { - buffer.flip(); - outputChannel.write(buffer); - buffer.compact(); + try (ReadableByteChannel channel = + FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId()); + FileOutputStream outputStream = new FileOutputStream(localFile.toFile()); + WritableByteChannel outputChannel = Channels.newChannel(outputStream)) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (channel.read(buffer) != -1) { + buffer.flip(); + outputChannel.write(buffer); + buffer.compact(); + } } - - // Close the channels and the output stream - channel.close(); - outputChannel.close(); - outputStream.close(); return localFile.toAbsolutePath().toString(); } catch (IOException e) { throw new IllegalArgumentException( String.format( "Unable to fetch file %s to be used locally to create a Kafka Consumer.", - configStr)); + configStr), + e); } } else { return configValue;