diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java index e6d58e46a13..e50ace19287 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java @@ -20,11 +20,12 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; -import java.util.zip.InflaterOutputStream; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.util.NonCopyingByteArrayOutputStream; /** @@ -38,6 +39,20 @@ public class DeflateCodec extends Codec { private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength"; + private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024; // 200MB default limit + + private static long getMaxDecompressLength() { + String prop = System.getProperty(MAX_DECOMPRESS_LENGTH_PROPERTY); + if (prop != null) { + try { + return Long.parseLong(prop); + } catch (NumberFormatException e) { + // Use default + } + } + return DEFAULT_MAX_DECOMPRESS_LENGTH; + } static class Option extends CodecFactory { private final int compressionLevel; @@ -78,10 +93,32 @@ public ByteBuffer compress(ByteBuffer data) throws IOException { @Override public ByteBuffer decompress(ByteBuffer data) throws IOException { + long maxLength = getMaxDecompressLength(); NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE); - try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) { - outputStream.write(data.array(), computeOffset(data), data.remaining()); + byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + long totalBytes = 0; + + Inflater inflater = getInflater(); + inflater.setInput(data.array(), computeOffset(data), data.remaining()); + + try { + while (!inflater.finished()) { + int len = inflater.inflate(buffer); + if (len == 0 && inflater.needsInput()) { + break; + } + totalBytes += len; + if (totalBytes > maxLength) { + throw new AvroRuntimeException( + "Decompressed size " + totalBytes + " exceeds maximum allowed size " + maxLength + + ". This can be configured by setting the system property " + MAX_DECOMPRESS_LENGTH_PROPERTY); + } + baos.write(buffer, 0, len); + } + } catch (DataFormatException e) { + throw new IOException("Invalid deflate data", e); } + return baos.asByteBuffer(); }