Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.util.NonCopyingByteArrayOutputStream;

/**
Expand All @@ -38,6 +39,20 @@
public class DeflateCodec extends Codec {

private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength";
private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024; // 200MB default limit

private static long getMaxDecompressLength() {
String prop = System.getProperty(MAX_DECOMPRESS_LENGTH_PROPERTY);
if (prop != null) {
try {
return Long.parseLong(prop);
} catch (NumberFormatException e) {
// Use default
}
}
return DEFAULT_MAX_DECOMPRESS_LENGTH;
}

static class Option extends CodecFactory {
private final int compressionLevel;
Expand Down Expand Up @@ -78,10 +93,32 @@ public ByteBuffer compress(ByteBuffer data) throws IOException {

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
long maxLength = getMaxDecompressLength();
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
long totalBytes = 0;

Inflater inflater = getInflater();
inflater.setInput(data.array(), computeOffset(data), data.remaining());

try {
while (!inflater.finished()) {
int len = inflater.inflate(buffer);
if (len == 0 && inflater.needsInput()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inflate() can return 0 when the stream isn’t finished and it doesn’t need more input (e.g., needsDictionary()), which would make this loop spin forever; also, breaking on needsInput() while !finished() risks returning truncated output without error. Consider explicitly handling the len == 0 cases (dictionary/truncated data) to avoid hangs and silent corruption.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! Returning 0 inflated bytes needs more care to avoid infinite loop or unfinished reads. Both would be very problematic for the user applications. An infinite loop would lead to high consumption of a CPU core. Reading less data would lead to broken compressed data.

break;
}
totalBytes += len;
if (totalBytes > maxLength) {
throw new AvroRuntimeException(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limit breach throws AvroRuntimeException even though decompress() is declared throws IOException; callers that currently catch IOException for decompression failures may miss this and crash. Consider using an IOException subclass (or otherwise documenting this behavior change) so error handling remains consistent.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The Augment AI reviewer is correct! The method signature declares that it would throw an IOException and this is what its callers would expect. Throwing a AvroRuntimeException may not be handled by the caller and lead to internal errors in the application.

"Decompressed size " + totalBytes + " exceeds maximum allowed size " + maxLength
+ ". This can be configured by setting the system property " + MAX_DECOMPRESS_LENGTH_PROPERTY);
}
baos.write(buffer, 0, len);
}
} catch (DataFormatException e) {
throw new IOException("Invalid deflate data", e);
}

return baos.asByteBuffer();
}
Comment on lines 95 to 123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix “no progress” inflate states to avoid infinite loop / accepting truncated data
In the current loop, len == 0 can occur with needsDictionary() (infinite loop) and needsInput() while not finished() (currently break + returns partial output). These should be treated as invalid/incomplete deflate data and fail fast.

Proposed diff
   try {
     while (!inflater.finished()) {
       int len = inflater.inflate(buffer);
-      if (len == 0 && inflater.needsInput()) {
-        break;
-      }
+      if (len == 0) {
+        if (inflater.needsDictionary()) {
+          throw new IOException("Invalid deflate data (requires a preset dictionary)");
+        }
+        if (inflater.needsInput()) {
+          throw new IOException("Invalid deflate data (truncated input)");
+        }
+        // Defensive: avoid spinning if inflater makes no progress
+        throw new IOException("Invalid deflate data (stalled inflater)");
+      }
       totalBytes += len;
       if (totalBytes > maxLength) {
         throw new AvroRuntimeException(
             "Decompressed size " + totalBytes + " exceeds maximum allowed size " + maxLength
                 + ". This can be configured by setting the system property " + MAX_DECOMPRESS_LENGTH_PROPERTY);
       }
       baos.write(buffer, 0, len);
     }
   } catch (DataFormatException e) {
     throw new IOException("Invalid deflate data", e);
   }
+
+  if (!inflater.finished()) {
+    throw new IOException("Invalid deflate data (did not reach stream end)");
+  }

   return baos.asByteBuffer();
 }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! Returning 0 inflated bytes needs more care to avoid infinite loop or unfinished reads. Both would be very problematic for the user applications. An infinite loop would lead to high consumption of a CPU core. Reading less data would lead to broken compressed data.


Expand Down