diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 74f9b147bbd6..c5764b39bc69 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -402,31 +402,24 @@ private static Object identityOrGcsToLocalFile(Object configValue) { LOG.info( "Downloading {} into local filesystem ({})", configStr, localFile.toAbsolutePath()); // TODO(pabloem): Only copy if file does not exist. - ReadableByteChannel channel = - FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId()); - FileOutputStream outputStream = new FileOutputStream(localFile.toFile()); - - // Create a WritableByteChannel to write data to the FileOutputStream - WritableByteChannel outputChannel = Channels.newChannel(outputStream); - - // Read data from the ReadableByteChannel and write it to the WritableByteChannel - ByteBuffer buffer = ByteBuffer.allocate(1024); - while (channel.read(buffer) != -1) { - buffer.flip(); - outputChannel.write(buffer); - buffer.compact(); + try (ReadableByteChannel channel = + FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId()); + FileOutputStream outputStream = new FileOutputStream(localFile.toFile()); + WritableByteChannel outputChannel = Channels.newChannel(outputStream)) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (channel.read(buffer) != -1) { + buffer.flip(); + outputChannel.write(buffer); + buffer.compact(); + } } - - // Close the channels and the output stream - channel.close(); - outputChannel.close(); - outputStream.close(); return localFile.toAbsolutePath().toString(); } catch (IOException e) { throw new IllegalArgumentException( String.format( "Unable to fetch file %s to be used locally to create a Kafka Consumer.", - configStr)); + configStr), + e); } } else { return configValue;