From d655771b126fccc84400bf040a2803add0ca506c Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 11 Feb 2026 17:34:02 -0800 Subject: [PATCH] add brotli support to Compression and Decompression stream --- src/workerd/api/BUILD.bazel | 1 + src/workerd/api/streams/compression.c++ | 171 ++++++++++++++++-- src/workerd/api/streams/compression.h | 4 +- src/workerd/api/streams/internal.c++ | 35 ++-- src/wpt/BUILD.bazel | 1 + src/wpt/compression-test.ts | 81 ++------- .../experimental/index.d.ts | 4 +- .../generated-snapshot/experimental/index.ts | 4 +- types/generated-snapshot/latest/index.d.ts | 4 +- types/generated-snapshot/latest/index.ts | 4 +- 10 files changed, 194 insertions(+), 115 deletions(-) diff --git a/src/workerd/api/BUILD.bazel b/src/workerd/api/BUILD.bazel index c8c46f1213c..51e5ef39f53 100644 --- a/src/workerd/api/BUILD.bazel +++ b/src/workerd/api/BUILD.bazel @@ -248,6 +248,7 @@ wd_cc_library( deps = [ "//src/workerd/io", "//src/workerd/util:state-machine", + "@capnp-cpp//src/kj/compat:kj-brotli", "@nbytes", ], ) diff --git a/src/workerd/api/streams/compression.c++ b/src/workerd/api/streams/compression.c++ index cb964b7f303..d0954481035 100644 --- a/src/workerd/api/streams/compression.c++ +++ b/src/workerd/api/streams/compression.c++ @@ -12,6 +12,9 @@ #include #include +#include +#include + namespace workerd::api { CompressionAllocator::CompressionAllocator( kj::Arc&& externalMemoryTarget) @@ -51,6 +54,21 @@ void CompressionAllocator::FreeForZlib(void* opaque, void* pointer) { namespace { +enum class Format { + GZIP, + DEFLATE, + DEFLATE_RAW, + BROTLI, +}; + +static Format parseFormat(kj::StringPtr format) { + if (format == "gzip") return Format::GZIP; + if (format == "deflate") return Format::DEFLATE; + if (format == "deflate-raw") return Format::DEFLATE_RAW; + if (format == "brotli") return Format::BROTLI; + KJ_UNREACHABLE; +} + class Context { public: enum class Mode { @@ -74,20 +92,26 @@ class Context { kj::Arc&& externalMemoryTarget) : allocator(kj::mv(externalMemoryTarget)), mode(mode), - strictCompression(flags) + strictCompression(flags), + format(parseFormat(format)) { + if (this->format == Format::BROTLI) { + initBrotli(); + return; + } + // Configure allocator before any stream operations. allocator.configure(&ctx); int result = Z_OK; switch (mode) { case Mode::COMPRESS: - result = deflateInit2(&ctx, Z_DEFAULT_COMPRESSION, Z_DEFLATED, getWindowBits(format), + result = deflateInit2(&ctx, Z_DEFAULT_COMPRESSION, Z_DEFLATED, getWindowBits(this->format), 8, // memLevel = 8 is the default Z_DEFAULT_STRATEGY); break; case Mode::DECOMPRESS: - result = inflateInit2(&ctx, getWindowBits(format)); + result = inflateInit2(&ctx, getWindowBits(this->format)); break; default: KJ_UNREACHABLE; @@ -96,6 +120,21 @@ class Context { } ~Context() noexcept(false) { + if (format == Format::BROTLI) { + switch (mode) { + case Mode::COMPRESS: + if (brotliEncoderState != nullptr) { + BrotliEncoderDestroyInstance(brotliEncoderState); + } + break; + case Mode::DECOMPRESS: + if (brotliDecoderState != nullptr) { + BrotliDecoderDestroyInstance(brotliDecoderState); + } + break; + } + return; + } switch (mode) { case Mode::COMPRESS: deflateEnd(&ctx); @@ -109,11 +148,19 @@ class Context { KJ_DISALLOW_COPY_AND_MOVE(Context); void setInput(const void* in, size_t size) { + if (format == Format::BROTLI) { + brotliNextIn = reinterpret_cast(in); + brotliAvailIn = size; + return; + } ctx.next_in = const_cast(reinterpret_cast(in)); ctx.avail_in = size; } Result pumpOnce(int flush) { + if (format == Format::BROTLI) { + return pumpBrotliOnce(flush); + } ctx.next_out = buffer; ctx.avail_out = sizeof(buffer); @@ -151,11 +198,76 @@ class Context { }; } + bool hasTrailingError() const { + return brotliTrailingError; + } + protected: CompressionAllocator allocator; private: - static int getWindowBits(kj::StringPtr format) { + void initBrotli() { + if (mode == Mode::COMPRESS) { + auto* instance = BrotliEncoderCreateInstance( + CompressionAllocator::AllocForBrotli, CompressionAllocator::FreeForZlib, &allocator); + JSG_REQUIRE(instance != nullptr, Error, "Failed to initialize compression context."_kj); + brotliEncoderState = instance; + return; + } + + auto* instance = BrotliDecoderCreateInstance( + CompressionAllocator::AllocForBrotli, CompressionAllocator::FreeForZlib, &allocator); + JSG_REQUIRE(instance != nullptr, Error, "Failed to initialize compression context."_kj); + brotliDecoderState = instance; + } + + Result pumpBrotliOnce(int flush) { + uint8_t* nextOut = buffer; + size_t availOut = sizeof(buffer); + + if (mode == Mode::COMPRESS) { + auto op = flush == Z_FINISH ? BROTLI_OPERATION_FINISH : BROTLI_OPERATION_PROCESS; + auto ok = BrotliEncoderCompressStream( + brotliEncoderState, op, &brotliAvailIn, &brotliNextIn, &availOut, &nextOut, nullptr); + JSG_REQUIRE(ok == BROTLI_TRUE, TypeError, "Compression failed."); + + bool shouldContinue = brotliAvailIn > 0 || BrotliEncoderHasMoreOutput(brotliEncoderState); + if (op == BROTLI_OPERATION_FINISH && !BrotliEncoderIsFinished(brotliEncoderState)) { + shouldContinue = true; + } + + return Result{ + .success = shouldContinue, + .buffer = kj::arrayPtr(buffer, sizeof(buffer) - availOut), + }; + } + + auto result = BrotliDecoderDecompressStream( + brotliDecoderState, &brotliAvailIn, &brotliNextIn, &availOut, &nextOut, nullptr); + JSG_REQUIRE(result != BROTLI_DECODER_RESULT_ERROR, TypeError, "Decompression failed."); + + if (strictCompression == ContextFlags::STRICT) { + // Track trailing data so we can surface the error after buffered output drains. + if (BrotliDecoderIsFinished(brotliDecoderState) && brotliAvailIn > 0) { + brotliTrailingError = true; + } + if (flush == Z_FINISH && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && + availOut == sizeof(buffer)) { + JSG_FAIL_REQUIRE( + TypeError, "Called close() on a decompression stream with incomplete data"); + } + } + + bool shouldContinue = result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT || + BrotliDecoderHasMoreOutput(brotliDecoderState); + + return Result{ + .success = shouldContinue, + .buffer = kj::arrayPtr(buffer, sizeof(buffer) - availOut), + }; + } + + static int getWindowBits(Format format) { // We use a windowBits value of 15 combined with the magic value // for the compression format type. For gzip, the magic value is // 16, so the value returned is 15 + 16. For deflate, the magic @@ -165,12 +277,16 @@ class Context { static constexpr auto GZIP = 16; static constexpr auto DEFLATE = 15; static constexpr auto DEFLATE_RAW = -15; - if (format == "gzip") - return DEFLATE + GZIP; - else if (format == "deflate") - return DEFLATE; - else if (format == "deflate-raw") - return DEFLATE_RAW; + switch (format) { + case Format::GZIP: + return DEFLATE + GZIP; + case Format::DEFLATE: + return DEFLATE; + case Format::DEFLATE_RAW: + return DEFLATE_RAW; + case Format::BROTLI: + KJ_UNREACHABLE; + } KJ_UNREACHABLE; } @@ -180,6 +296,14 @@ class Context { // For the eponymous compatibility flag ContextFlags strictCompression; + Format format; + const uint8_t* brotliNextIn = nullptr; + size_t brotliAvailIn = 0; + // Brotli state structs are opaque, so kj::Own would require complete types. + BrotliEncoderState* brotliEncoderState = nullptr; + BrotliDecoderState* brotliDecoderState = nullptr; + // Defer reporting of trailing brotli bytes until output is drained. + bool brotliTrailingError = false; }; // Buffer class based on std::vector that erases data that has been read from it lazily to avoid @@ -289,9 +413,18 @@ class CompressionStreamBase: public kj::Refcounted, KJ_ASSERT(minBytes <= maxBytes); // Re-throw any stored exception throwIfException(); - // If stream has ended normally and no buffered data, return EOF - if (isInTerminalState() && output.empty()) { - co_return static_cast(0); + if (output.empty()) { + // For brotli we defer trailing-data errors until buffered output is drained. + if (context.hasTrailingError()) { + auto ex = + JSG_KJ_EXCEPTION(FAILED, TypeError, "Trailing bytes after end of compressed data"); + cancelInternal(kj::cp(ex)); + kj::throwFatalException(kj::mv(ex)); + } + // If stream has ended normally and no buffered data, return EOF. + if (isInTerminalState()) { + co_return static_cast(0); + } } // Active or terminal with data remaining co_return co_await tryReadInternal( @@ -659,8 +792,10 @@ kj::Rc> createDecompressionStre } // namespace jsg::Ref CompressionStream::constructor(jsg::Lock& js, kj::String format) { - JSG_REQUIRE(format == "deflate" || format == "gzip" || format == "deflate-raw", TypeError, - "The compression format must be either 'deflate', 'deflate-raw' or 'gzip'."); + JSG_REQUIRE( + format == "deflate" || format == "gzip" || format == "deflate-raw" || format == "brotli", + TypeError, + "The compression format must be either 'deflate', 'deflate-raw', 'gzip', or 'brotli'."); // TODO(cleanup): Once the autogate is removed, we can delete CompressionStreamImpl kj::Rc> impl = createCompressionStreamImpl( @@ -679,8 +814,10 @@ jsg::Ref CompressionStream::constructor(jsg::Lock& js, kj::St } jsg::Ref DecompressionStream::constructor(jsg::Lock& js, kj::String format) { - JSG_REQUIRE(format == "deflate" || format == "gzip" || format == "deflate-raw", TypeError, - "The compression format must be either 'deflate', 'deflate-raw' or 'gzip'."); + JSG_REQUIRE( + format == "deflate" || format == "gzip" || format == "deflate-raw" || format == "brotli", + TypeError, + "The compression format must be either 'deflate', 'deflate-raw', 'gzip', or 'brotli'."); kj::Rc> impl = createDecompressionStreamImpl(kj::mv(format), diff --git a/src/workerd/api/streams/compression.h b/src/workerd/api/streams/compression.h index 5b93142b706..25bd49e0496 100644 --- a/src/workerd/api/streams/compression.h +++ b/src/workerd/api/streams/compression.h @@ -44,7 +44,7 @@ class CompressionStream: public TransformStream { JSG_INHERIT(TransformStream); JSG_TS_OVERRIDE(extends TransformStream { constructor(format - : "gzip" | "deflate" | "deflate-raw"); + : "gzip" | "deflate" | "deflate-raw" | "brotli"); }); } }; @@ -59,7 +59,7 @@ class DecompressionStream: public TransformStream { JSG_INHERIT(TransformStream); JSG_TS_OVERRIDE(extends TransformStream { constructor(format - : "gzip" | "deflate" | "deflate-raw"); + : "gzip" | "deflate" | "deflate-raw" | "brotli"); }); } }; diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index c6f1fe6b97b..952d7b070fb 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -18,20 +18,6 @@ namespace workerd::api { namespace { -// Use this in places where the exception thrown would cause finalizers to run. Your exception -// will not go anywhere, but we'll log the exception message to the console until the problem this -// papers over is fixed. -[[noreturn]] void throwTypeErrorAndConsoleWarn(kj::StringPtr message) { - KJ_IF_SOME(context, IoContext::tryCurrent()) { - if (context.isInspectorEnabled()) { - context.logWarning(message); - } - } - - kj::throwFatalException(kj::Exception(kj::Exception::Type::FAILED, __FILE__, __LINE__, - kj::str(JSG_EXCEPTION(TypeError) ": ", message))); -} - kj::Promise pumpTo(ReadableStreamSource& input, WritableStreamSink& output, bool end) { kj::byte buffer[4096]{}; @@ -1072,8 +1058,18 @@ jsg::Promise WritableStreamInternalController::write( return js.rejectedPromise(errored.addRef(js)); } KJ_CASE_ONEOF(writable, IoOwn) { + // Byte streams must reject invalid chunks and error the stream so reads fail too. + auto rejectInvalidChunk = [&](kj::StringPtr message) { + auto reason = js.v8TypeError(message); + writable->abort(js.exceptionToKj(js.v8Ref(reason))); + doError(js, reason); + return js.rejectedPromise(reason); + }; + if (value == kj::none) { - return js.resolvedPromise(); + return rejectInvalidChunk( + "This TransformStream is being used as a byte stream, but received an object of " + "non-ArrayBuffer/ArrayBufferView type on its writable side."); } auto chunk = KJ_ASSERT_NONNULL(value); @@ -1090,16 +1086,14 @@ jsg::Promise WritableStreamInternalController::write( byteLength = view->ByteLength(); byteOffset = view->ByteOffset(); } else if (chunk->IsString()) { - // TODO(later): This really ought to return a rejected promise and not a sync throw. // This case caused me a moment of confusion during testing, so I think it's worth // a specific error message. - throwTypeErrorAndConsoleWarn( + return rejectInvalidChunk( "This TransformStream is being used as a byte stream, but received a string on its " "writable side. If you wish to write a string, you'll probably want to explicitly " "UTF-8-encode it with TextEncoder."); } else { - // TODO(later): This really ought to return a rejected promise and not a sync throw. - throwTypeErrorAndConsoleWarn( + return rejectInvalidChunk( "This TransformStream is being used as a byte stream, but received an object of " "non-ArrayBuffer/ArrayBufferView type on its writable side."); } @@ -1116,8 +1110,7 @@ jsg::Promise WritableStreamInternalController::write( auto ptr = kj::ArrayPtr(static_cast(store->Data()) + byteOffset, byteLength); if (store->IsShared()) { - throwTypeErrorAndConsoleWarn( - "Cannot construct an array buffer from a shared backing store"); + return rejectInvalidChunk("Cannot construct an array buffer from a shared backing store"); } queue.push_back( WriteEvent{.outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(), diff --git a/src/wpt/BUILD.bazel b/src/wpt/BUILD.bazel index a7382c9ba7d..3c3092201f1 100644 --- a/src/wpt/BUILD.bazel +++ b/src/wpt/BUILD.bazel @@ -82,6 +82,7 @@ wpt_test( wpt_test( name = "compression", + size = "enormous", config = "compression-test.ts", start_server = True, target_compatible_with = select({ diff --git a/src/wpt/compression-test.ts b/src/wpt/compression-test.ts index c4438fa624c..bab9ffedce6 100644 --- a/src/wpt/compression-test.ts +++ b/src/wpt/compression-test.ts @@ -5,64 +5,20 @@ import { type TestRunnerConfig } from 'harness/harness'; export default { - 'compression-bad-chunks.any.js': { - comment: 'Test times out - needs investigation', - disabledTests: true, - }, + 'compression-bad-chunks.any.js': {}, 'compression-constructor-error.any.js': {}, - 'compression-including-empty-chunk.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [ - "the result of compressing [,Hello,Hello] with brotli should be 'HelloHello'", - "the result of compressing [Hello,,Hello] with brotli should be 'HelloHello'", - "the result of compressing [Hello,Hello,] with brotli should be 'HelloHello'", - ], - }, - 'compression-large-flush-output.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: ['brotli compression with large flush output'], - }, - 'compression-multiple-chunks.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/compressing \d+ chunks with brotli should work/], - }, - 'compression-output-length.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [ - 'the length of brotli data should be shorter than that of the original data', - ], - }, - 'compression-stream.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [ - /brotli .* data should be reinflated back to its origin/, - ], - }, + 'compression-including-empty-chunk.any.js': {}, + 'compression-large-flush-output.any.js': {}, + 'compression-multiple-chunks.any.js': {}, + 'compression-output-length.any.js': {}, + 'compression-stream.any.js': {}, 'compression-with-detach.window.js': {}, - 'decompression-bad-chunks.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/brotli/], - }, - 'decompression-buffersource.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/brotli/], - }, - 'decompression-constructor-error.any.js': { - comment: - 'brotli compression is not supported - these pass because brotli throws', - }, - 'decompression-correct-input.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/.*brotli.*/], - }, - 'decompression-corrupt-input.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/brotli/], - }, - 'decompression-empty-input.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/.*brotli.*/], - }, + 'decompression-bad-chunks.any.js': {}, + 'decompression-buffersource.any.js': {}, + 'decompression-constructor-error.any.js': {}, + 'decompression-correct-input.any.js': {}, + 'decompression-corrupt-input.any.js': {}, + 'decompression-empty-input.any.js': {}, 'decompression-extra-input.any.js': { comment: 'Extra padding tests fail - workerd handles trailing data differently', @@ -70,19 +26,10 @@ export default { 'decompressing deflate input with extra pad should still give the output', 'decompressing gzip input with extra pad should still give the output', 'decompressing deflate-raw input with extra pad should still give the output', - /brotli/, - ], - }, - 'decompression-split-chunk.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [/.*brotli/], - }, - 'decompression-uint8array-output.any.js': { - comment: 'brotli compression is not supported', - expectedFailures: [ - 'decompressing brotli output should give Uint8Array chunks', ], }, + 'decompression-split-chunk.any.js': {}, + 'decompression-uint8array-output.any.js': {}, 'decompression-with-detach.window.js': { comment: 'Detach test fails - needs investigation', expectedFailures: [ diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index d5fce056771..b13355cedf3 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3111,7 +3111,7 @@ declare class CompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`DecompressionStream`** interface of the Compression Streams API is an API for decompressing a stream of data. @@ -3122,7 +3122,7 @@ declare class DecompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`TextEncoderStream`** interface of the Encoding API converts a stream of strings into bytes in the UTF-8 encoding. diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 71c0d95e46c..0984d29c22a 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3117,7 +3117,7 @@ export declare class CompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`DecompressionStream`** interface of the Compression Streams API is an API for decompressing a stream of data. @@ -3128,7 +3128,7 @@ export declare class DecompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`TextEncoderStream`** interface of the Encoding API converts a stream of strings into bytes in the UTF-8 encoding. diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index e037d3df8bb..09479d7a19b 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -3017,7 +3017,7 @@ declare class CompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`DecompressionStream`** interface of the Compression Streams API is an API for decompressing a stream of data. @@ -3028,7 +3028,7 @@ declare class DecompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`TextEncoderStream`** interface of the Encoding API converts a stream of strings into bytes in the UTF-8 encoding. diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index a8607b690f6..113fac0eec7 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -3023,7 +3023,7 @@ export declare class CompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`DecompressionStream`** interface of the Compression Streams API is an API for decompressing a stream of data. @@ -3034,7 +3034,7 @@ export declare class DecompressionStream extends TransformStream< ArrayBuffer | ArrayBufferView, Uint8Array > { - constructor(format: "gzip" | "deflate" | "deflate-raw"); + constructor(format: "gzip" | "deflate" | "deflate-raw" | "brotli"); } /** * The **`TextEncoderStream`** interface of the Encoding API converts a stream of strings into bytes in the UTF-8 encoding.