From ab0e269baf2ece73a396df7c6375a650a34a3e34 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 11:05:14 +0000 Subject: [PATCH 1/9] Add ZSTD bindings to the node:zlib package. --- build/deps/deps.jsonc | 4 + build/deps/gen/deps.MODULE.bazel | 3 + src/node/internal/internal_errors.ts | 6 + src/node/internal/internal_zlib.ts | 112 ++++++- src/node/internal/internal_zlib_base.ts | 90 ++++- src/node/internal/zlib.d.ts | 100 ++++++ src/node/zlib.ts | 28 ++ src/workerd/api/node/BUILD.bazel | 1 + src/workerd/api/node/tests/BUILD.bazel | 7 + .../api/node/tests/zlib-nodejs-test.js | 35 ++ .../api/node/tests/zstd-nodejs-test.js | 313 ++++++++++++++++++ .../api/node/tests/zstd-nodejs-test.wd-test | 14 + src/workerd/api/node/zlib-util.c++ | 312 +++++++++++++++++ src/workerd/api/node/zlib-util.h | 175 +++++++++- 14 files changed, 1192 insertions(+), 8 deletions(-) create mode 100644 src/workerd/api/node/tests/zstd-nodejs-test.js create mode 100644 src/workerd/api/node/tests/zstd-nodejs-test.wd-test diff --git a/build/deps/deps.jsonc b/build/deps/deps.jsonc index 423d0067973..8b74d175cee 100644 --- a/build/deps/deps.jsonc +++ b/build/deps/deps.jsonc @@ -19,6 +19,10 @@ "name": "brotli", "type": "bazel_dep" }, + { + "name": "zstd", + "type": "bazel_dep" + }, { "name": "tcmalloc", "type": "bazel_dep" diff --git a/build/deps/gen/deps.MODULE.bazel b/build/deps/gen/deps.MODULE.bazel index ff6afb08aa3..7c2e8c1dc0f 100644 --- a/build/deps/gen/deps.MODULE.bazel +++ b/build/deps/gen/deps.MODULE.bazel @@ -142,3 +142,6 @@ git_override( ], remote = "https://chromium.googlesource.com/chromium/src/third_party/zlib.git", ) + +# zstd +bazel_dep(name = "zstd", version = "1.5.7.bcr.1") diff --git a/src/node/internal/internal_errors.ts b/src/node/internal/internal_errors.ts index a062b46eda6..554a8d228e7 100644 --- a/src/node/internal/internal_errors.ts +++ b/src/node/internal/internal_errors.ts @@ -556,6 +556,12 @@ export class ERR_BROTLI_INVALID_PARAM extends NodeRangeError { } } +export class ERR_ZSTD_INVALID_PARAM extends NodeRangeError { + constructor(value: unknown) { + super('ERR_ZSTD_INVALID_PARAM', `${value} is not a valid Zstd parameter`); + } +} + export class ERR_ZLIB_INITIALIZATION_FAILED extends NodeError { constructor() { super('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed'); diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index 1321a10b6f9..e112691a152 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -7,11 +7,17 @@ import { default as zlibUtil, type ZlibOptions, type BrotliOptions, + type ZstdOptions, } from 'node-internal:zlib'; import { Buffer } from 'node-internal:internal_buffer'; import { validateUint32 } from 'node-internal:validators'; import { ERR_INVALID_ARG_TYPE } from 'node-internal:internal_errors'; -import { Zlib, Brotli, type ZlibBase } from 'node-internal:internal_zlib_base'; +import { + Zlib, + Brotli, + Zstd, + type ZlibBase, +} from 'node-internal:internal_zlib_base'; type ZlibResult = Buffer | { buffer: Buffer; engine: ZlibBase }; type CompressCallback = (err: Error | null, result?: ZlibResult) => void; @@ -26,6 +32,8 @@ const { CONST_UNZIP, CONST_BROTLI_DECODE, CONST_BROTLI_ENCODE, + CONST_ZSTD_ENCODE, + CONST_ZSTD_DECODE, } = zlibUtil; const ZlibMode = { @@ -328,6 +336,88 @@ export function brotliCompress( processChunkCaptureError(new BrotliCompress(options), data, callback); } + +export function zstdDecompressSync( + data: ArrayBufferView | string, + options: ZstdOptions = {} +): ZlibResult { + if (!options.info) { + // Fast path, where we send the data directly to C++ + return Buffer.from(zlibUtil.zstdDecompressSync(data, options)); + } + + // Else, use the Engine class in sync mode + return processChunk(new ZstdDecompress(options), data); +} + +export function zstdCompressSync( + data: ArrayBufferView | string, + options: ZstdOptions = {} +): ZlibResult { + if (!options.info) { + // Fast path, where we send the data directly to C++ + return Buffer.from(zlibUtil.zstdCompressSync(data, options)); + } + + // Else, use the Engine class in sync mode + return processChunk(new ZstdCompress(options), data); +} + +export function zstdDecompress( + data: ArrayBufferView | string, + optionsOrCallback: ZstdOptions | CompressCallback, + callbackOrUndefined?: CompressCallback +): void { + const [options, callback] = normalizeArgs( + optionsOrCallback, + callbackOrUndefined + ); + + if (!options.info) { + // Fast path + zlibUtil.zstdDecompress(data, options, (res) => { + queueMicrotask(() => { + if (res instanceof Error) { + callback(res); + } else { + callback(null, Buffer.from(res)); + } + }); + }); + + return; + } + + processChunkCaptureError(new ZstdDecompress(options), data, callback); +} + +export function zstdCompress( + data: ArrayBufferView | string, + optionsOrCallback: ZstdOptions | CompressCallback, + callbackOrUndefined?: CompressCallback +): void { + const [options, callback] = normalizeArgs( + optionsOrCallback, + callbackOrUndefined + ); + + if (!options.info) { + // Fast path + zlibUtil.zstdCompress(data, options, (res) => { + queueMicrotask(() => { + if (res instanceof Error) { + callback(res); + } else { + callback(null, Buffer.from(res)); + } + }); + }); + + return; + } + + processChunkCaptureError(new ZstdCompress(options), data, callback); +} export class Gzip extends Zlib { constructor(options: ZlibOptions) { super(options, CONST_GZIP); @@ -385,6 +475,18 @@ export class BrotliDecompress extends Brotli { } } +export class ZstdCompress extends Zstd { + constructor(options: ZstdOptions) { + super(options, CONST_ZSTD_ENCODE); + } +} + +export class ZstdDecompress extends Zstd { + constructor(options: ZstdOptions) { + super(options, CONST_ZSTD_DECODE); + } +} + const CLASS_BY_MODE: Record< (typeof ZlibMode)[keyof typeof ZlibMode], | typeof Deflate @@ -440,3 +542,11 @@ export function createBrotliDecompress( ): BrotliDecompress { return new BrotliDecompress(options); } + +export function createZstdCompress(options: ZstdOptions): ZstdCompress { + return new ZstdCompress(options); +} + +export function createZstdDecompress(options: ZstdOptions): ZstdDecompress { + return new ZstdDecompress(options); +} diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index 68e8adf7707..af203d360dc 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -7,6 +7,7 @@ import { default as zlibUtil, type ZlibOptions, type BrotliOptions, + type ZstdOptions, } from 'node-internal:zlib'; import { Buffer, kMaxLength } from 'node-internal:internal_buffer'; import { @@ -18,6 +19,7 @@ import { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_BROTLI_INVALID_PARAM, + ERR_ZSTD_INVALID_PARAM, ERR_ZLIB_INITIALIZATION_FAILED, NodeError, } from 'node-internal:internal_errors'; @@ -62,20 +64,29 @@ const { CONST_BROTLI_OPERATION_EMIT_METADATA, CONST_BROTLI_OPERATION_FINISH, CONST_BROTLI_OPERATION_FLUSH, + CONST_ZSTD_ENCODE, + CONST_ZSTD_DECODE, + CONST_ZSTD_e_continue, + CONST_ZSTD_e_end, + CONST_ZSTD_e_flush, } = zlibUtil; // This type contains all possible handler types. type ZlibHandleType = | zlibUtil.ZlibStream | zlibUtil.BrotliEncoder - | zlibUtil.BrotliDecoder; + | zlibUtil.BrotliDecoder + | zlibUtil.ZstdEncoder + | zlibUtil.ZstdDecoder; export const owner_symbol = Symbol('owner'); const FLUSH_BOUND_IDX_NORMAL: number = 0; const FLUSH_BOUND_IDX_BROTLI: number = 1; -const FLUSH_BOUND: [[number, number], [number, number]] = [ +const FLUSH_BOUND_IDX_ZSTD: number = 2; +const FLUSH_BOUND: [[number, number], [number, number], [number, number]] = [ [CONST_Z_NO_FLUSH, CONST_Z_BLOCK], [CONST_BROTLI_OPERATION_PROCESS, CONST_BROTLI_OPERATION_EMIT_METADATA], + [CONST_ZSTD_e_continue, CONST_ZSTD_e_end], ]; const kFlushFlag = Symbol('kFlushFlag'); @@ -369,10 +380,12 @@ export class ZlibBase extends Transform { let maxOutputLength = kMaxLength; let flushBoundIdx; - if (mode !== CONST_BROTLI_ENCODE && mode !== CONST_BROTLI_DECODE) { - flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; - } else { + if (mode === CONST_BROTLI_ENCODE || mode === CONST_BROTLI_DECODE) { flushBoundIdx = FLUSH_BOUND_IDX_BROTLI; + } else if (mode === CONST_ZSTD_ENCODE || mode === CONST_ZSTD_DECODE) { + flushBoundIdx = FLUSH_BOUND_IDX_ZSTD; + } else { + flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; } /* eslint-disable-next-line @typescript-eslint/no-unnecessary-condition */ @@ -798,3 +811,70 @@ export class Brotli extends ZlibBase { this._writeState = _writeState; } } + +const kMaxZstdParam = Math.max( + ...Object.entries(constants).map(([key, value]) => + key.startsWith('ZSTD_c_') || key.startsWith('ZSTD_d_') ? value : 0 + ) +); +const zstdInitParamsArray = new Int32Array(kMaxZstdParam + 1); +const zstdDefaultOptions: ZlibDefaultOptions = { + flush: CONST_ZSTD_e_continue, + finishFlush: CONST_ZSTD_e_end, + fullFlush: CONST_ZSTD_e_flush, +}; + +export class Zstd extends ZlibBase { + constructor(options: ZstdOptions | undefined | null, mode: number) { + ok(mode === CONST_ZSTD_DECODE || mode === CONST_ZSTD_ENCODE); + zstdInitParamsArray.fill(-1); + + if (options?.params) { + for (const [origKey, value] of Object.entries(options.params)) { + const key = +origKey; + if ( + Number.isNaN(key) || + key < 0 || + key > kMaxZstdParam || + ((zstdInitParamsArray[key] as number) | 0) !== -1 + ) { + throw new ERR_ZSTD_INVALID_PARAM(origKey); + } + + if (typeof value !== 'number' && typeof value !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE( + 'options.params[key]', + 'number', + value + ); + } + // as number is required to avoid force type coercion on runtime. + // boolean has number representation, but typescript doesn't understand it. + zstdInitParamsArray[key] = value as number; + } + } + + const handle = + mode === CONST_ZSTD_DECODE + ? new zlibUtil.ZstdDecoder(mode) + : new zlibUtil.ZstdEncoder(mode); + + const _writeState = new Uint32Array(2); + + const pledgedSrcSize = options?.pledgedSrcSize; + + if ( + !handle.initialize( + zstdInitParamsArray, + _writeState, + processCallback.bind(handle), + pledgedSrcSize + ) + ) { + throw new ERR_ZLIB_INITIALIZATION_FAILED(); + } + + super(options ?? {}, mode, handle, zstdDefaultOptions); + this._writeState = _writeState; + } +} diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index f0e9e301867..3bbc60ed28c 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -36,6 +36,26 @@ export function brotliCompress( cb: InternalCompressCallback ): void; +export function zstdDecompressSync( + data: ArrayBufferView | string, + options: ZstdOptions +): ArrayBuffer; +export function zstdDecompress( + data: ArrayBufferView | string, + options: ZstdOptions, + cb: InternalCompressCallback +): void; + +export function zstdCompressSync( + data: ArrayBufferView | string, + options: ZstdOptions +): ArrayBuffer; +export function zstdCompress( + data: ArrayBufferView | string, + options: ZstdOptions, + cb: InternalCompressCallback +): void; + // zlib.constants (part of the API contract for node:zlib) export const CONST_Z_NO_FLUSH: number; export const CONST_Z_PARTIAL_FLUSH: number; @@ -74,6 +94,8 @@ export const CONST_INFLATERAW: number; export const CONST_UNZIP: number; export const CONST_BROTLI_DECODE: number; export const CONST_BROTLI_ENCODE: number; +export const CONST_ZSTD_ENCODE: number; +export const CONST_ZSTD_DECODE: number; export const CONST_Z_MIN_WINDOWBITS: number; export const CONST_Z_MAX_WINDOWBITS: number; @@ -150,6 +172,49 @@ export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: number; export const CONST_BROTLI_DECODER_ERROR_UNREACHABLE: number; +// Zstd flush directives +export const CONST_ZSTD_e_continue: number; +export const CONST_ZSTD_e_flush: number; +export const CONST_ZSTD_e_end: number; + +// Zstd compression parameters +export const CONST_ZSTD_c_compressionLevel: number; +export const CONST_ZSTD_c_windowLog: number; +export const CONST_ZSTD_c_hashLog: number; +export const CONST_ZSTD_c_chainLog: number; +export const CONST_ZSTD_c_searchLog: number; +export const CONST_ZSTD_c_minMatch: number; +export const CONST_ZSTD_c_targetLength: number; +export const CONST_ZSTD_c_strategy: number; +export const CONST_ZSTD_c_enableLongDistanceMatching: number; +export const CONST_ZSTD_c_ldmHashLog: number; +export const CONST_ZSTD_c_ldmMinMatch: number; +export const CONST_ZSTD_c_ldmBucketSizeLog: number; +export const CONST_ZSTD_c_ldmHashRateLog: number; +export const CONST_ZSTD_c_contentSizeFlag: number; +export const CONST_ZSTD_c_checksumFlag: number; +export const CONST_ZSTD_c_dictIDFlag: number; +export const CONST_ZSTD_c_nbWorkers: number; +export const CONST_ZSTD_c_jobSize: number; +export const CONST_ZSTD_c_overlapLog: number; + +// Zstd decompression parameters +export const CONST_ZSTD_d_windowLogMax: number; + +// Zstd strategy constants +export const CONST_ZSTD_fast: number; +export const CONST_ZSTD_dfast: number; +export const CONST_ZSTD_greedy: number; +export const CONST_ZSTD_lazy: number; +export const CONST_ZSTD_lazy2: number; +export const CONST_ZSTD_btlazy2: number; +export const CONST_ZSTD_btopt: number; +export const CONST_ZSTD_btultra: number; +export const CONST_ZSTD_btultra2: number; + +// Zstd default compression level +export const CONST_ZSTD_CLEVEL_DEFAULT: number; + export interface ZlibOptions { flush?: number | undefined; finishFlush?: number | undefined; @@ -177,6 +242,21 @@ export interface BrotliOptions { info?: boolean | undefined; } +export interface ZstdOptions { + flush?: number | undefined; + finishFlush?: number | undefined; + chunkSize?: number | undefined; + params?: + | { + [key: number]: boolean | number; + } + | undefined; + maxOutputLength?: number | undefined; + pledgedSrcSize?: number | undefined; + // Not specified in NodeJS docs but the tests expect it + info?: boolean | undefined; +} + type ErrorHandler = (errno: number, code: string, message: string) => void; type ProcessHandler = () => void; @@ -246,3 +326,23 @@ export class BrotliEncoder extends CompressionStream { ): boolean; params(): void; } + +export class ZstdDecoder extends CompressionStream { + initialize( + params: Int32Array, + writeResult: Uint32Array, + writeCallback: () => void, + pledgedSrcSize?: number + ): boolean; + params(): void; +} + +export class ZstdEncoder extends CompressionStream { + initialize( + params: Int32Array, + writeResult: Uint32Array, + writeCallback: () => void, + pledgedSrcSize?: number + ): boolean; + params(): void; +} diff --git a/src/node/zlib.ts b/src/node/zlib.ts index 946bf339c79..f1c08ddcf80 100644 --- a/src/node/zlib.ts +++ b/src/node/zlib.ts @@ -46,6 +46,8 @@ const { UNZIP, BROTLI_DECODE, BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE, Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, @@ -141,6 +143,8 @@ const InflateRaw = protectMethod(zlib.InflateRaw); const Unzip = protectMethod(zlib.Unzip); const BrotliCompress = protectMethod(zlib.BrotliCompress); const BrotliDecompress = protectMethod(zlib.BrotliDecompress); +const ZstdCompress = protectMethod(zlib.ZstdCompress); +const ZstdDecompress = protectMethod(zlib.ZstdDecompress); const createGzip = protectMethod(zlib.createGzip); const createGunzip = protectMethod(zlib.createGunzip); @@ -151,6 +155,8 @@ const createInflateRaw = protectMethod(zlib.createInflateRaw); const createUnzip = protectMethod(zlib.createUnzip); const createBrotliCompress = protectMethod(zlib.createBrotliCompress); const createBrotliDecompress = protectMethod(zlib.createBrotliDecompress); +const createZstdCompress = protectMethod(zlib.createZstdCompress); +const createZstdDecompress = protectMethod(zlib.createZstdDecompress); const inflate = protectMethod(zlib.inflate); const inflateSync = protectMethod(zlib.inflateSync); @@ -170,6 +176,10 @@ const brotliCompress = protectMethod(zlib.brotliCompress); const brotliCompressSync = protectMethod(zlib.brotliCompressSync); const brotliDecompress = protectMethod(zlib.brotliDecompress); const brotliDecompressSync = protectMethod(zlib.brotliDecompressSync); +const zstdCompress = protectMethod(zlib.zstdCompress); +const zstdCompressSync = protectMethod(zlib.zstdCompressSync); +const zstdDecompress = protectMethod(zlib.zstdDecompress); +const zstdDecompressSync = protectMethod(zlib.zstdDecompressSync); export { crc32, @@ -186,6 +196,8 @@ export { Unzip, BrotliCompress, BrotliDecompress, + ZstdCompress, + ZstdDecompress, // Convenience methods to create classes createGzip, @@ -197,6 +209,8 @@ export { createUnzip, createBrotliCompress, createBrotliDecompress, + createZstdCompress, + createZstdDecompress, // One-shot methods inflate, @@ -217,6 +231,10 @@ export { brotliDecompressSync, brotliCompress, brotliCompressSync, + zstdCompress, + zstdCompressSync, + zstdDecompress, + zstdDecompressSync, // NodeJS also exports all constants directly under zlib, but this is deprecated Z_NO_FLUSH, @@ -253,6 +271,8 @@ export { UNZIP, BROTLI_DECODE, BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE, Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS, @@ -344,6 +364,8 @@ export default { Unzip, BrotliCompress, BrotliDecompress, + ZstdCompress, + ZstdDecompress, // Convenience methods to create classes createGzip, @@ -355,6 +377,8 @@ export default { createUnzip, createBrotliCompress, createBrotliDecompress, + createZstdCompress, + createZstdDecompress, // One-shot methods inflate, @@ -375,4 +399,8 @@ export default { brotliDecompressSync, brotliCompress, brotliCompressSync, + zstdCompress, + zstdCompressSync, + zstdDecompress, + zstdDecompressSync, }; diff --git a/src/workerd/api/node/BUILD.bazel b/src/workerd/api/node/BUILD.bazel index b756602cfef..3f3b3de96f1 100644 --- a/src/workerd/api/node/BUILD.bazel +++ b/src/workerd/api/node/BUILD.bazel @@ -52,6 +52,7 @@ wd_cc_library( "//src/workerd/util:mimetype", "@capnp-cpp//src/kj/compat:kj-brotli", "@ncrypto", + "@zstd", ], ) diff --git a/src/workerd/api/node/tests/BUILD.bazel b/src/workerd/api/node/tests/BUILD.bazel index 110a4f596e4..ed9b7286b2c 100644 --- a/src/workerd/api/node/tests/BUILD.bazel +++ b/src/workerd/api/node/tests/BUILD.bazel @@ -216,6 +216,13 @@ wd_test( data = ["zlib-nodejs-test.js"], ) +wd_test( + size = "large", + src = "zstd-nodejs-test.wd-test", + args = ["--experimental"], + data = ["zstd-nodejs-test.js"], +) + wd_test( src = "module-nodejs-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index 1f8494ce3e8..5386213c810 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -333,6 +333,41 @@ export const constantsTest = { 'INFLATERAW', 'UNZIP', 'ZLIB_VERNUM', + 'ZSTD_CLEVEL_DEFAULT', + 'ZSTD_DECODE', + 'ZSTD_ENCODE', + 'ZSTD_btlazy2', + 'ZSTD_btopt', + 'ZSTD_btultra', + 'ZSTD_btultra2', + 'ZSTD_c_chainLog', + 'ZSTD_c_checksumFlag', + 'ZSTD_c_compressionLevel', + 'ZSTD_c_contentSizeFlag', + 'ZSTD_c_dictIDFlag', + 'ZSTD_c_enableLongDistanceMatching', + 'ZSTD_c_hashLog', + 'ZSTD_c_jobSize', + 'ZSTD_c_ldmBucketSizeLog', + 'ZSTD_c_ldmHashLog', + 'ZSTD_c_ldmHashRateLog', + 'ZSTD_c_ldmMinMatch', + 'ZSTD_c_minMatch', + 'ZSTD_c_nbWorkers', + 'ZSTD_c_overlapLog', + 'ZSTD_c_searchLog', + 'ZSTD_c_strategy', + 'ZSTD_c_targetLength', + 'ZSTD_c_windowLog', + 'ZSTD_d_windowLogMax', + 'ZSTD_dfast', + 'ZSTD_e_continue', + 'ZSTD_e_end', + 'ZSTD_e_flush', + 'ZSTD_fast', + 'ZSTD_greedy', + 'ZSTD_lazy', + 'ZSTD_lazy2', 'Z_BEST_COMPRESSION', 'Z_BEST_SPEED', 'Z_BLOCK', diff --git a/src/workerd/api/node/tests/zstd-nodejs-test.js b/src/workerd/api/node/tests/zstd-nodejs-test.js new file mode 100644 index 00000000000..33864af1d88 --- /dev/null +++ b/src/workerd/api/node/tests/zstd-nodejs-test.js @@ -0,0 +1,313 @@ +import assert from 'node:assert'; +import { Buffer } from 'node:buffer'; +import { promisify } from 'node:util'; +import zlib from 'node:zlib'; + +// Helper function to promisify callbacks +const zstdCompressAsync = promisify(zlib.zstdCompress); +const zstdDecompressAsync = promisify(zlib.zstdDecompress); + +// Basic sync compress/decompress test +export const zstdBasicSyncTest = { + test() { + const input = Buffer.from('Hello, Zstd compression!'); + const compressed = zlib.zstdCompressSync(input); + assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); + assert(compressed.length > 0, 'Compressed output should not be empty'); + + const decompressed = zlib.zstdDecompressSync(compressed); + assert(Buffer.isBuffer(decompressed), 'Decompressed output should be a buffer'); + assert.strictEqual(decompressed.toString(), input.toString(), 'Round-trip should match'); + }, +}; + +// Basic async compress/decompress test +export const zstdBasicAsyncTest = { + async test() { + const input = Buffer.from('Hello, async Zstd compression!'); + const compressed = await zstdCompressAsync(input); + assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); + assert(compressed.length > 0, 'Compressed output should not be empty'); + + const decompressed = await zstdDecompressAsync(compressed); + assert(Buffer.isBuffer(decompressed), 'Decompressed output should be a buffer'); + assert.strictEqual(decompressed.toString(), input.toString(), 'Round-trip should match'); + }, +}; + +// Test with string input +export const zstdStringInputTest = { + test() { + const input = 'This is a string input for Zstd compression'; + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual(decompressed.toString(), input, 'String input round-trip should match'); + }, +}; + +// Test with larger data +export const zstdLargeDataTest = { + test() { + // Create a 100KB buffer with repetitive content (compresses well) + const input = Buffer.alloc(100 * 1024); + for (let i = 0; i < input.length; i++) { + input[i] = i % 256; + } + + const compressed = zlib.zstdCompressSync(input); + assert(compressed.length < input.length, 'Compressed should be smaller than input'); + + const decompressed = zlib.zstdDecompressSync(compressed); + assert(input.equals(decompressed), 'Large data round-trip should match'); + }, +}; + +// Test with different compression levels +export const zstdCompressionLevelsTest = { + test() { + const input = Buffer.from('Test data for compression level testing'.repeat(100)); + + // Test compression level 1 (fastest) + const compressedFast = zlib.zstdCompressSync(input, { + params: { [zlib.constants.ZSTD_c_compressionLevel]: 1 }, + }); + + // Test compression level 19 (best compression) + const compressedBest = zlib.zstdCompressSync(input, { + params: { [zlib.constants.ZSTD_c_compressionLevel]: 19 }, + }); + + // Both should decompress correctly + const decompressedFast = zlib.zstdDecompressSync(compressedFast); + const decompressedBest = zlib.zstdDecompressSync(compressedBest); + + assert(input.equals(decompressedFast), 'Fast compression should decompress correctly'); + assert(input.equals(decompressedBest), 'Best compression should decompress correctly'); + + // Higher compression level should typically produce smaller output + assert( + compressedBest.length <= compressedFast.length, + 'Higher compression level should produce smaller or equal output' + ); + }, +}; + +// Test with default compression level +export const zstdDefaultCompressionTest = { + test() { + const input = Buffer.from('Testing default compression level'); + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Default compression should work' + ); + }, +}; + +// Test stream API +export const zstdStreamTest = { + async test() { + const input = Buffer.from('Stream compression test data'.repeat(50)); + + // Create compress stream + const compress = zlib.createZstdCompress(); + const decompress = zlib.createZstdDecompress(); + + const chunks = []; + + // Pipe through compress + compress.on('data', (chunk) => chunks.push(chunk)); + + await new Promise((resolve, reject) => { + compress.on('end', resolve); + compress.on('error', reject); + compress.end(input); + }); + + const compressed = Buffer.concat(chunks); + assert(compressed.length > 0, 'Stream should produce output'); + + // Decompress + const decompressedChunks = []; + decompress.on('data', (chunk) => decompressedChunks.push(chunk)); + + await new Promise((resolve, reject) => { + decompress.on('end', resolve); + decompress.on('error', reject); + decompress.end(compressed); + }); + + const decompressed = Buffer.concat(decompressedChunks); + assert(input.equals(decompressed), 'Stream round-trip should match'); + }, +}; + +// Test empty input +export const zstdEmptyInputTest = { + test() { + const input = Buffer.alloc(0); + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual(decompressed.length, 0, 'Empty input should produce empty output'); + }, +}; + +// Test constants are exported correctly +export const zstdConstantsTest = { + test() { + // Flush directives + assert.strictEqual(typeof zlib.constants.ZSTD_e_continue, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_e_flush, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_e_end, 'number'); + + // Compression parameters + assert.strictEqual(typeof zlib.constants.ZSTD_c_compressionLevel, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_windowLog, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_hashLog, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_chainLog, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_searchLog, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_minMatch, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_targetLength, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_c_strategy, 'number'); + + // Decompression parameters + assert.strictEqual(typeof zlib.constants.ZSTD_d_windowLogMax, 'number'); + + // Strategy constants + assert.strictEqual(typeof zlib.constants.ZSTD_fast, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_dfast, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_greedy, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_lazy, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_lazy2, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_btlazy2, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_btopt, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_btultra, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_btultra2, 'number'); + + // Default compression level + assert.strictEqual(typeof zlib.constants.ZSTD_CLEVEL_DEFAULT, 'number'); + + // Mode constants + assert.strictEqual(typeof zlib.constants.ZSTD_ENCODE, 'number'); + assert.strictEqual(typeof zlib.constants.ZSTD_DECODE, 'number'); + }, +}; + +// Test invalid compressed data +export const zstdInvalidDataTest = { + test() { + const invalidData = Buffer.from('This is not valid zstd compressed data'); + assert.throws( + () => zlib.zstdDecompressSync(invalidData), + /Zstd decompression failed/, + 'Should throw on invalid compressed data' + ); + }, +}; + +// Test chunkSize option +export const zstdChunkSizeTest = { + test() { + const input = Buffer.from('Testing chunk size option'.repeat(100)); + const compressed = zlib.zstdCompressSync(input, { chunkSize: 1024 }); + const decompressed = zlib.zstdDecompressSync(compressed, { chunkSize: 1024 }); + assert(input.equals(decompressed), 'Custom chunkSize should work correctly'); + }, +}; + +// Test maxOutputLength option +export const zstdMaxOutputLengthTest = { + test() { + const input = Buffer.from('A'.repeat(1000)); + const compressed = zlib.zstdCompressSync(input); + + // Try to decompress with a maxOutputLength that's too small + assert.throws( + () => zlib.zstdDecompressSync(compressed, { maxOutputLength: 10 }), + /Memory limit exceeded/, + 'Should throw when maxOutputLength is exceeded' + ); + }, +}; + +// Test callback error handling +export const zstdCallbackErrorTest = { + async test() { + const invalidData = Buffer.from('invalid zstd data'); + + try { + await zstdDecompressAsync(invalidData); + assert.fail('Should have thrown'); + } catch (err) { + assert(err instanceof Error, 'Should receive an error'); + } + }, +}; + +// Test with params for strategy +export const zstdStrategyTest = { + test() { + const input = Buffer.from('Testing compression strategy'.repeat(50)); + + // Test with ZSTD_fast strategy + const compressedFast = zlib.zstdCompressSync(input, { + params: { + [zlib.constants.ZSTD_c_strategy]: zlib.constants.ZSTD_fast, + }, + }); + + const decompressed = zlib.zstdDecompressSync(compressedFast); + assert(input.equals(decompressed), 'Strategy option should work correctly'); + }, +}; + +// Test with info option +export const zstdInfoOptionTest = { + test() { + const input = Buffer.from('Testing info option'); + const result = zlib.zstdCompressSync(input, { info: true }); + + // When info is true, result should be an object with buffer and engine properties + assert(typeof result === 'object', 'Result should be an object when info is true'); + assert(result.buffer, 'Result should have a buffer property'); + assert(result.engine, 'Result should have an engine property'); + }, +}; + +// Test classes are exported +export const zstdClassesExportedTest = { + test() { + assert.strictEqual(typeof zlib.ZstdCompress, 'function', 'ZstdCompress should be exported'); + assert.strictEqual(typeof zlib.ZstdDecompress, 'function', 'ZstdDecompress should be exported'); + assert.strictEqual( + typeof zlib.createZstdCompress, + 'function', + 'createZstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.createZstdDecompress, + 'function', + 'createZstdDecompress should be exported' + ); + }, +}; + +// Test sync functions are exported +export const zstdSyncFunctionsExportedTest = { + test() { + assert.strictEqual( + typeof zlib.zstdCompressSync, + 'function', + 'zstdCompressSync should be exported' + ); + assert.strictEqual( + typeof zlib.zstdDecompressSync, + 'function', + 'zstdDecompressSync should be exported' + ); + assert.strictEqual(typeof zlib.zstdCompress, 'function', 'zstdCompress should be exported'); + assert.strictEqual(typeof zlib.zstdDecompress, 'function', 'zstdDecompress should be exported'); + }, +}; diff --git a/src/workerd/api/node/tests/zstd-nodejs-test.wd-test b/src/workerd/api/node/tests/zstd-nodejs-test.wd-test new file mode 100644 index 00000000000..ee04da3d80b --- /dev/null +++ b/src/workerd/api/node/tests/zstd-nodejs-test.wd-test @@ -0,0 +1,14 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "nodejs-zstd-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "zstd-nodejs-test.js") + ], + compatibilityFlags = ["experimental", "nodejs_compat", "nodejs_compat_v2", "nodejs_zlib"], + ) + ), + ], +); diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 95f85e44c6c..c4179bfbc72 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -755,6 +755,238 @@ kj::Maybe BrotliDecoderContext::getError() const { return kj::none; } +// ======================================================================================= +// Zstd Implementation + +void ZstdContext::setBuffers(kj::ArrayPtr input, kj::ArrayPtr output) { + input_.src = input.begin(); + input_.size = input.size(); + input_.pos = 0; + output_.dst = output.begin(); + output_.size = output.size(); + output_.pos = 0; +} + +void ZstdContext::setInputBuffer(kj::ArrayPtr input) { + input_.src = input.begin(); + input_.size = input.size(); + input_.pos = 0; +} + +void ZstdContext::setOutputBuffer(kj::ArrayPtr output) { + output_.dst = output.begin(); + output_.size = output.size(); + output_.pos = 0; +} + +void ZstdContext::setFlush(int flush) { + flush_ = static_cast(flush); +} + +kj::uint ZstdContext::getAvailOut() const { + return output_.size - output_.pos; +} + +void ZstdContext::getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const { + *availIn = input_.size - input_.pos; + *availOut = output_.size - output_.pos; +} + +ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): ZstdContext(_mode) { + cctx_ = ZSTD_createCCtx(); +} + +ZstdEncoderContext::~ZstdEncoderContext() { + if (cctx_ != nullptr) { + ZSTD_freeCCtx(cctx_); + cctx_ = nullptr; + } +} + +kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSize) { + if (cctx_ == nullptr) { + cctx_ = ZSTD_createCCtx(); + } + + if (cctx_ == nullptr) { + return CompressionError( + "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); + } + + if (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN) { + size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_, pledgedSrcSize); + if (ZSTD_isError(result)) { + error_ = ZSTD_getErrorCode(result); + return CompressionError(kj::str(ZSTD_getErrorName(result)), + kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + } + } + + return kj::none; +} + +void ZstdEncoderContext::work() { + JSG_REQUIRE(mode == ZlibMode::ZSTD_ENCODE, Error, "Mode should be ZSTD_ENCODE"_kj); + JSG_REQUIRE(cctx_ != nullptr, Error, "Zstd context should not be null"_kj); + + lastResult = ZSTD_compressStream2(cctx_, &output_, &input_, flush_); + + if (ZSTD_isError(lastResult)) { + error_ = ZSTD_getErrorCode(lastResult); + } +} + +kj::Maybe ZstdEncoderContext::resetStream() { + if (cctx_ != nullptr) { + size_t result = ZSTD_CCtx_reset(cctx_, ZSTD_reset_session_only); + if (ZSTD_isError(result)) { + error_ = ZSTD_getErrorCode(result); + return CompressionError(kj::str(ZSTD_getErrorName(result)), + kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + } + } + return kj::none; +} + +kj::Maybe ZstdEncoderContext::setParams(int key, int value) { + size_t result = ZSTD_CCtx_setParameter(cctx_, static_cast(key), value); + if (ZSTD_isError(result)) { + return CompressionError( + kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); + } + return kj::none; +} + +kj::Maybe ZstdEncoderContext::getError() const { + if (error_ != ZSTD_error_no_error) { + return CompressionError(kj::str("Zstd compression failed: ", ZSTD_getErrorString(error_)), + kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + } + + if (flush_ == ZSTD_e_end && lastResult != 0) { + // lastResult > 0 means more output is needed, which shouldn't happen at end + return CompressionError("Unexpected end of file"_kj, "Z_BUF_ERROR"_kj, Z_BUF_ERROR); + } + + return kj::none; +} + +ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode): ZstdContext(_mode) { + dctx_ = ZSTD_createDCtx(); +} + +ZstdDecoderContext::~ZstdDecoderContext() { + if (dctx_ != nullptr) { + ZSTD_freeDCtx(dctx_); + dctx_ = nullptr; + } +} + +kj::Maybe ZstdDecoderContext::initialize() { + if (dctx_ == nullptr) { + dctx_ = ZSTD_createDCtx(); + } + + if (dctx_ == nullptr) { + return CompressionError( + "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); + } + + return kj::none; +} + +void ZstdDecoderContext::work() { + JSG_REQUIRE(mode == ZlibMode::ZSTD_DECODE, Error, "Mode should be ZSTD_DECODE"_kj); + JSG_REQUIRE(dctx_ != nullptr, Error, "Zstd context should not be null"_kj); + + lastResult = ZSTD_decompressStream(dctx_, &output_, &input_); + + if (ZSTD_isError(lastResult)) { + error_ = ZSTD_getErrorCode(lastResult); + } +} + +kj::Maybe ZstdDecoderContext::resetStream() { + if (dctx_ != nullptr) { + size_t result = ZSTD_DCtx_reset(dctx_, ZSTD_reset_session_only); + if (ZSTD_isError(result)) { + error_ = ZSTD_getErrorCode(result); + return CompressionError(kj::str(ZSTD_getErrorName(result)), + kj::str("ERR_ZSTD_DECOMPRESSION_FAILED"), -1); + } + } + return kj::none; +} + +kj::Maybe ZstdDecoderContext::setParams(int key, int value) { + size_t result = ZSTD_DCtx_setParameter(dctx_, static_cast(key), value); + if (ZSTD_isError(result)) { + return CompressionError( + kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); + } + return kj::none; +} + +kj::Maybe ZstdDecoderContext::getError() const { + if (error_ != ZSTD_error_no_error) { + return CompressionError(kj::str("Zstd decompression failed: ", ZSTD_getErrorString(error_)), + kj::str("ERR_ZSTD_DECOMPRESSION_FAILED"), -1); + } + + // For decompression, lastResult == 0 means frame is complete + // If we have flush_ == ZSTD_e_end equivalent and there's input left, that's an error + if (flush_ == ZSTD_e_end && input_.pos < input_.size && lastResult == 0) { + // Frame completed but there's still input - could be trailing data + } + + return kj::none; +} + +template +jsg::Ref> ZlibUtil::ZstdCompressionStream< + CompressionContext>::constructor(jsg::Lock& js, ZlibModeValue mode) { + return js.alloc(static_cast(mode), js.getExternalMemoryTarget()); +} + +template +bool ZlibUtil::ZstdCompressionStream::initialize(jsg::Lock& js, + jsg::BufferSource params, + jsg::BufferSource writeResult, + jsg::Function writeCallback, + jsg::Optional pledgedSrcSize) { + this->initializeStream(kj::mv(writeResult), kj::mv(writeCallback)); + + uint64_t srcSize = pledgedSrcSize.orDefault(ZSTD_CONTENTSIZE_UNKNOWN); + + kj::Maybe maybeError; + if constexpr (CompressionContext::Mode == ZlibMode::ZSTD_ENCODE) { + maybeError = this->context()->initialize(srcSize); + } else { + maybeError = this->context()->initialize(); + } + + KJ_IF_SOME(err, maybeError) { + this->emitError(js, kj::mv(err)); + return false; + } + + auto results = params.template asArrayPtr(); + + for (size_t i = 0; i < results.size(); i++) { + if (results[i] == -1) { + continue; + } + + KJ_IF_SOME(err, this->context()->setParams(i, results[i])) { + this->emitError(js, kj::mv(err)); + return false; + } + } + return true; +} + template jsg::Ref> ZlibUtil::BrotliCompressionStream< CompressionContext>::constructor(jsg::Lock& js, ZlibModeValue mode) { @@ -934,6 +1166,72 @@ void ZlibUtil::brotliWithCallback( cb(js, kj::mv(res)); } +template +kj::Array ZlibUtil::zstdSync( + jsg::Lock& js, InputSource data, ZstdContext::Options opts) { + Context ctx(Context::Mode); + + auto chunkSize = opts.chunkSize.orDefault(ZLIB_PERFORMANT_CHUNK_SIZE); + auto maxOutputLength = opts.maxOutputLength.orDefault(Z_MAX_CHUNK); + + JSG_REQUIRE(Z_MIN_CHUNK <= chunkSize && chunkSize <= Z_MAX_CHUNK, RangeError, + kj::str("The value of \"options.chunkSize\" is out of range. It must be >= ", Z_MIN_CHUNK, + " and <= ", Z_MAX_CHUNK, ". Received ", chunkSize)); + JSG_REQUIRE(maxOutputLength >= 1 && maxOutputLength <= Z_MAX_CHUNK, RangeError, + kj::str("The value of \"options.maxOutputLength\" is out of range. It must be >= 1 and <= ", + Z_MAX_CHUNK, ". Received ", maxOutputLength)); + GrowableBuffer result(ZLIB_PERFORMANT_CHUNK_SIZE, maxOutputLength); + + // Initialize the context + if constexpr (Context::Mode == ZlibMode::ZSTD_ENCODE) { + uint64_t pledgedSrcSize = opts.pledgedSrcSize.orDefault(ZSTD_CONTENTSIZE_UNKNOWN); + KJ_IF_SOME(err, ctx.initialize(pledgedSrcSize)) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } else { + KJ_IF_SOME(err, ctx.initialize()) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } + + // Set parameters + KJ_IF_SOME(params, opts.params) { + for (const auto& field: params.fields) { + KJ_IF_SOME(err, ctx.setParams(field.name.parseAs(), field.value)) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } + } + + auto flush = opts.flush.orDefault(ZSTD_e_continue); + JSG_REQUIRE(ZSTD_e_continue <= flush && flush <= ZSTD_e_end, RangeError, + kj::str("The value of \"options.flush\" is out of range. It must be >= ", ZSTD_e_continue, + " and <= ", ZSTD_e_end, ". Received ", flush)); + + auto finishFlush = opts.finishFlush.orDefault(ZSTD_e_end); + JSG_REQUIRE(ZSTD_e_continue <= finishFlush && finishFlush <= ZSTD_e_end, RangeError, + kj::str("The value of \"options.finishFlush\" is out of range. It must be >= ", + ZSTD_e_continue, " and <= ", ZSTD_e_end, ". Received ", finishFlush)); + + ctx.setFlush(finishFlush); + ctx.setInputBuffer(getInputFromSource(data)); + return syncProcessBuffer(ctx, result); +} + +template +void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb) { + // Capture only relevant errors so they can be passed to the callback + auto res = js.tryCatch([&]() { + return CompressCallbackArg(zstdSync(js, kj::mv(data), kj::mv(options))); + }, [&](jsg::Value&& exception) { + return CompressCallbackArg(jsg::JsValue(exception.getHandle(js))); + }); + + // Ensure callback is invoked only once + cb(js, kj::mv(res)); +} + #ifndef CREATE_TEMPLATE #define CREATE_TEMPLATE(T) \ template class ZlibUtil::CompressionStream; \ @@ -947,10 +1245,15 @@ void ZlibUtil::brotliWithCallback( CREATE_TEMPLATE(ZlibContext) CREATE_TEMPLATE(BrotliEncoderContext) CREATE_TEMPLATE(BrotliDecoderContext) +CREATE_TEMPLATE(ZstdEncoderContext) +CREATE_TEMPLATE(ZstdDecoderContext) template class ZlibUtil::BrotliCompressionStream; template class ZlibUtil::BrotliCompressionStream; +template class ZlibUtil::ZstdCompressionStream; +template class ZlibUtil::ZstdCompressionStream; + template kj::Array ZlibUtil::brotliSync( jsg::Lock& js, InputSource data, BrotliContext::Options opts); template kj::Array ZlibUtil::brotliSync( @@ -959,6 +1262,15 @@ template void ZlibUtil::brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); template void ZlibUtil::brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); + +template kj::Array ZlibUtil::zstdSync( + jsg::Lock& js, InputSource data, ZstdContext::Options opts); +template kj::Array ZlibUtil::zstdSync( + jsg::Lock& js, InputSource data, ZstdContext::Options opts); +template void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); +template void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); #undef CREATE_TEMPLATE #endif } // namespace workerd::api::node diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index ca5f2a63dbf..425bc98c5f0 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include @@ -71,7 +73,9 @@ enum class ZlibMode : ZlibModeValue { INFLATERAW, UNZIP, BROTLI_DECODE, - BROTLI_ENCODE + BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE }; // When possible, we intentionally override chunkSize to a value that is likely to perform better @@ -283,6 +287,76 @@ class BrotliDecoderContext final: public BrotliContext { kj::Own state; }; +class ZstdContext { + public: + explicit ZstdContext(ZlibMode _mode): mode(_mode) {} + KJ_DISALLOW_COPY(ZstdContext); + + void setBuffers(kj::ArrayPtr input, kj::ArrayPtr output); + void setInputBuffer(kj::ArrayPtr input); + void setOutputBuffer(kj::ArrayPtr output); + void setFlush(int flush); + kj::uint getAvailOut() const; + void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const; + void setMode(ZlibMode _mode) { + mode = _mode; + } + + struct Options { + jsg::Optional flush; + jsg::Optional finishFlush; + jsg::Optional chunkSize; + jsg::Optional> params; + jsg::Optional maxOutputLength; + jsg::Optional pledgedSrcSize; + JSG_STRUCT(flush, finishFlush, chunkSize, params, maxOutputLength, pledgedSrcSize); + }; + + protected: + ZlibMode mode; + ZSTD_inBuffer input_{nullptr, 0, 0}; + ZSTD_outBuffer output_{nullptr, 0, 0}; + ZSTD_EndDirective flush_ = ZSTD_e_continue; +}; + +class ZstdEncoderContext final: public ZstdContext { + public: + static const ZlibMode Mode = ZlibMode::ZSTD_ENCODE; + explicit ZstdEncoderContext(ZlibMode _mode); + ~ZstdEncoderContext(); + KJ_DISALLOW_COPY_AND_MOVE(ZstdEncoderContext); + + void work(); + kj::Maybe initialize(uint64_t pledgedSrcSize); + kj::Maybe resetStream(); + kj::Maybe setParams(int key, int value); + kj::Maybe getError() const; + + private: + size_t lastResult = 0; + ZSTD_CCtx* cctx_ = nullptr; + ZSTD_ErrorCode error_ = ZSTD_error_no_error; +}; + +class ZstdDecoderContext final: public ZstdContext { + public: + static const ZlibMode Mode = ZlibMode::ZSTD_DECODE; + explicit ZstdDecoderContext(ZlibMode _mode); + ~ZstdDecoderContext(); + KJ_DISALLOW_COPY_AND_MOVE(ZstdDecoderContext); + + void work(); + kj::Maybe initialize(); + kj::Maybe resetStream(); + kj::Maybe setParams(int key, int value); + kj::Maybe getError() const; + + private: + size_t lastResult = 0; + ZSTD_DCtx* dctx_ = nullptr; + ZSTD_ErrorCode error_ = ZSTD_error_no_error; +}; + // Implements utilities in support of the Node.js Zlib class ZlibUtil final: public jsg::Object { public: @@ -417,6 +491,37 @@ class ZlibUtil final: public jsg::Object { } }; + template + class ZstdCompressionStream: public CompressionStream { + public: + explicit ZstdCompressionStream( + ZlibMode _mode, kj::Arc&& externalMemoryTarget) + : CompressionStream(_mode, kj::mv(externalMemoryTarget)) {} + KJ_DISALLOW_COPY_AND_MOVE(ZstdCompressionStream); + static jsg::Ref constructor(jsg::Lock& js, ZlibModeValue mode); + + bool initialize(jsg::Lock& js, + jsg::BufferSource params, + jsg::BufferSource writeResult, + jsg::Function writeCallback, + jsg::Optional pledgedSrcSize); + + void params() { + // Currently a no-op, and not accessed from JS land. + } + + JSG_RESOURCE_TYPE(ZstdCompressionStream) { + JSG_INHERIT(CompressionStream); + + JSG_METHOD(initialize); + JSG_METHOD(params); + } + + CompressionContext* context() { + return this->CompressionStream::context(); + } + }; + using InputSource = kj::OneOf, kj::Array>; using CompressCallbackArg = kj::OneOf>; using CompressCallback = jsg::Function; @@ -436,6 +541,12 @@ class ZlibUtil final: public jsg::Object { void brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); + template + kj::Array zstdSync(jsg::Lock& js, InputSource data, ZstdContext::Options options); + template + void zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); + JSG_RESOURCE_TYPE(ZlibUtil) { JSG_METHOD_NAMED(crc32, crc32Sync); JSG_METHOD(zlibSync); @@ -446,9 +557,16 @@ class ZlibUtil final: public jsg::Object { JSG_METHOD_NAMED(brotliDecompress, template brotliWithCallback); JSG_METHOD_NAMED(brotliCompress, template brotliWithCallback); + JSG_METHOD_NAMED(zstdDecompressSync, template zstdSync); + JSG_METHOD_NAMED(zstdCompressSync, template zstdSync); + JSG_METHOD_NAMED(zstdDecompress, template zstdWithCallback); + JSG_METHOD_NAMED(zstdCompress, template zstdWithCallback); + JSG_NESTED_TYPE(ZlibStream); JSG_NESTED_TYPE_NAMED(BrotliCompressionStream, BrotliEncoder); JSG_NESTED_TYPE_NAMED(BrotliCompressionStream, BrotliDecoder); + JSG_NESTED_TYPE_NAMED(ZstdCompressionStream, ZstdEncoder); + JSG_NESTED_TYPE_NAMED(ZstdCompressionStream, ZstdDecoder); // zlib.constants (part of the API contract for node:zlib) JSG_STATIC_CONSTANT_NAMED(CONST_Z_NO_FLUSH, Z_NO_FLUSH); @@ -597,6 +715,54 @@ class ZlibUtil final: public jsg::Object { BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES); JSG_STATIC_CONSTANT_NAMED( CONST_BROTLI_DECODER_ERROR_UNREACHABLE, BROTLI_DECODER_ERROR_UNREACHABLE); + + // Zstd mode constants + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_ENCODE, static_cast(ZlibMode::ZSTD_ENCODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_DECODE, static_cast(ZlibMode::ZSTD_DECODE)); + + // Zstd flush directives + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_continue, ZSTD_e_continue); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_flush, ZSTD_e_flush); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_end, ZSTD_e_end); + + // Zstd compression parameters + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_compressionLevel, ZSTD_c_compressionLevel); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_windowLog, ZSTD_c_windowLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_hashLog, ZSTD_c_hashLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_chainLog, ZSTD_c_chainLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_searchLog, ZSTD_c_searchLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_minMatch, ZSTD_c_minMatch); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_targetLength, ZSTD_c_targetLength); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_strategy, ZSTD_c_strategy); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_enableLongDistanceMatching, + ZSTD_c_enableLongDistanceMatching); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmHashLog, ZSTD_c_ldmHashLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmMinMatch, ZSTD_c_ldmMinMatch); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmBucketSizeLog, ZSTD_c_ldmBucketSizeLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmHashRateLog, ZSTD_c_ldmHashRateLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_contentSizeFlag, ZSTD_c_contentSizeFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_checksumFlag, ZSTD_c_checksumFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_dictIDFlag, ZSTD_c_dictIDFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_nbWorkers, ZSTD_c_nbWorkers); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_jobSize, ZSTD_c_jobSize); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_overlapLog, ZSTD_c_overlapLog); + + // Zstd decompression parameters + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_d_windowLogMax, ZSTD_d_windowLogMax); + + // Zstd strategy constants + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_fast, ZSTD_fast); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_dfast, ZSTD_dfast); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_greedy, ZSTD_greedy); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_lazy, ZSTD_lazy); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_lazy2, ZSTD_lazy2); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btlazy2, ZSTD_btlazy2); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btopt, ZSTD_btopt); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btultra, ZSTD_btultra); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btultra2, ZSTD_btultra2); + + // Zstd default compression level + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_CLEVEL_DEFAULT, ZSTD_CLEVEL_DEFAULT); } }; @@ -604,10 +770,15 @@ class ZlibUtil final: public jsg::Object { api::node::ZlibUtil, api::node::ZlibUtil::ZlibStream, \ api::node::ZlibUtil::BrotliCompressionStream, \ api::node::ZlibUtil::BrotliCompressionStream, \ + api::node::ZlibUtil::ZstdCompressionStream, \ + api::node::ZlibUtil::ZstdCompressionStream, \ api::node::ZlibUtil::CompressionStream, \ api::node::ZlibUtil::CompressionStream, \ api::node::ZlibUtil::CompressionStream, \ - api::node::ZlibContext::Options, api::node::BrotliContext::Options + api::node::ZlibUtil::CompressionStream, \ + api::node::ZlibUtil::CompressionStream, \ + api::node::ZlibContext::Options, api::node::BrotliContext::Options, \ + api::node::ZstdContext::Options } // namespace workerd::api::node From e1da60c2d3a96a3eb2943d49eead6432d40245ad Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:20:57 +0000 Subject: [PATCH 2/9] zlib-util minor formatting and asserts --- src/workerd/api/node/zlib-util.c++ | 33 +++++++++++++----------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index c4179bfbc72..53d7a9c2095 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -759,12 +759,8 @@ kj::Maybe BrotliDecoderContext::getError() const { // Zstd Implementation void ZstdContext::setBuffers(kj::ArrayPtr input, kj::ArrayPtr output) { - input_.src = input.begin(); - input_.size = input.size(); - input_.pos = 0; - output_.dst = output.begin(); - output_.size = output.size(); - output_.pos = 0; + setInputBuffer(input); + setOutputBuffer(output); } void ZstdContext::setInputBuffer(kj::ArrayPtr input) { @@ -780,6 +776,8 @@ void ZstdContext::setOutputBuffer(kj::ArrayPtr output) { } void ZstdContext::setFlush(int flush) { + KJ_DASSERT(flush >= ZSTD_e_continue && flush <= ZSTD_e_end, + "flush must be a valid ZSTD_EndDirective value"); flush_ = static_cast(flush); } @@ -792,9 +790,9 @@ void ZstdContext::getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) con *availOut = output_.size - output_.pos; } -ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): ZstdContext(_mode) { - cctx_ = ZSTD_createCCtx(); -} +ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): + ZstdContext(_mode), + cctx_(ZSTD_createCCtx()) {} ZstdEncoderContext::~ZstdEncoderContext() { if (cctx_ != nullptr) { @@ -804,9 +802,6 @@ ZstdEncoderContext::~ZstdEncoderContext() { } kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSize) { - if (cctx_ == nullptr) { - cctx_ = ZSTD_createCCtx(); - } if (cctx_ == nullptr) { return CompressionError( @@ -849,6 +844,8 @@ kj::Maybe ZstdEncoderContext::resetStream() { } kj::Maybe ZstdEncoderContext::setParams(int key, int value) { + KJ_DASSERT(key >= ZSTD_c_compressionLevel, + "key must be a valid ZSTD_cParameter (first valid value is ZSTD_c_compressionLevel)"); size_t result = ZSTD_CCtx_setParameter(cctx_, static_cast(key), value); if (ZSTD_isError(result)) { return CompressionError( @@ -872,9 +869,9 @@ kj::Maybe ZstdEncoderContext::getError() const { return kj::none; } -ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode): ZstdContext(_mode) { - dctx_ = ZSTD_createDCtx(); -} +ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode): + ZstdContext(_mode), + dctx_(ZSTD_createDCtx()) {} ZstdDecoderContext::~ZstdDecoderContext() { if (dctx_ != nullptr) { @@ -884,10 +881,8 @@ ZstdDecoderContext::~ZstdDecoderContext() { } kj::Maybe ZstdDecoderContext::initialize() { - if (dctx_ == nullptr) { - dctx_ = ZSTD_createDCtx(); - } - + // dctx_ is created in the constructor. It can only be nullptr if ZSTD_createDCtx() + // failed due to memory allocation failure. if (dctx_ == nullptr) { return CompressionError( "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); From 3e2357ed96eea245eeb75bfebaf2db62523577c9 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:25:58 +0000 Subject: [PATCH 3/9] Bring zstd arguments in line with nodejs layout. --- src/node/internal/internal_zlib.ts | 8 +++++-- src/node/internal/internal_zlib_base.ts | 31 ++++++++++++++++++------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index e112691a152..375261a45e6 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -16,6 +16,10 @@ import { Zlib, Brotli, Zstd, + zstdInitCParamsArray, + zstdInitDParamsArray, + kMaxZstdCParam, + kMaxZstdDParam, type ZlibBase, } from 'node-internal:internal_zlib_base'; @@ -477,13 +481,13 @@ export class BrotliDecompress extends Brotli { export class ZstdCompress extends Zstd { constructor(options: ZstdOptions) { - super(options, CONST_ZSTD_ENCODE); + super(options, CONST_ZSTD_ENCODE, zstdInitCParamsArray, kMaxZstdCParam); } } export class ZstdDecompress extends Zstd { constructor(options: ZstdOptions) { - super(options, CONST_ZSTD_DECODE); + super(options, CONST_ZSTD_DECODE, zstdInitDParamsArray, kMaxZstdDParam); } } diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index af203d360dc..d6f546f75b2 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -812,12 +812,20 @@ export class Brotli extends ZlibBase { } } -const kMaxZstdParam = Math.max( +export const kMaxZstdCParam = Math.max( ...Object.entries(constants).map(([key, value]) => - key.startsWith('ZSTD_c_') || key.startsWith('ZSTD_d_') ? value : 0 + key.startsWith('ZSTD_c_') ? value : 0 ) ); -const zstdInitParamsArray = new Int32Array(kMaxZstdParam + 1); +export const zstdInitCParamsArray = new Int32Array(kMaxZstdCParam + 1); + +export const kMaxZstdDParam = Math.max( + ...Object.entries(constants).map(([key, value]) => + key.startsWith('ZSTD_d_') ? value : 0 + ) +); +export const zstdInitDParamsArray = new Int32Array(kMaxZstdDParam + 1); + const zstdDefaultOptions: ZlibDefaultOptions = { flush: CONST_ZSTD_e_continue, finishFlush: CONST_ZSTD_e_end, @@ -825,9 +833,14 @@ const zstdDefaultOptions: ZlibDefaultOptions = { }; export class Zstd extends ZlibBase { - constructor(options: ZstdOptions | undefined | null, mode: number) { + constructor( + options: ZstdOptions | undefined | null, + mode: number, + initParamsArray: Int32Array, + maxParam: number + ) { ok(mode === CONST_ZSTD_DECODE || mode === CONST_ZSTD_ENCODE); - zstdInitParamsArray.fill(-1); + initParamsArray.fill(-1); if (options?.params) { for (const [origKey, value] of Object.entries(options.params)) { @@ -835,8 +848,8 @@ export class Zstd extends ZlibBase { if ( Number.isNaN(key) || key < 0 || - key > kMaxZstdParam || - ((zstdInitParamsArray[key] as number) | 0) !== -1 + key > maxParam || + ((initParamsArray[key] as number) | 0) !== -1 ) { throw new ERR_ZSTD_INVALID_PARAM(origKey); } @@ -850,7 +863,7 @@ export class Zstd extends ZlibBase { } // as number is required to avoid force type coercion on runtime. // boolean has number representation, but typescript doesn't understand it. - zstdInitParamsArray[key] = value as number; + initParamsArray[key] = value as number; } } @@ -865,7 +878,7 @@ export class Zstd extends ZlibBase { if ( !handle.initialize( - zstdInitParamsArray, + initParamsArray, _writeState, processCallback.bind(handle), pledgedSrcSize From 3f9e7867d1803e6095de958aaac22ff002d53373 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:30:04 +0000 Subject: [PATCH 4/9] Test updates --- src/workerd/api/node/tests/BUILD.bazel | 4 +-- ...odejs-test.js => zlib-zstd-nodejs-test.js} | 32 ++++++++++++++----- ....wd-test => zlib-zstd-nodejs-test.wd-test} | 2 +- 3 files changed, 27 insertions(+), 11 deletions(-) rename src/workerd/api/node/tests/{zstd-nodejs-test.js => zlib-zstd-nodejs-test.js} (92%) rename src/workerd/api/node/tests/{zstd-nodejs-test.wd-test => zlib-zstd-nodejs-test.wd-test} (80%) diff --git a/src/workerd/api/node/tests/BUILD.bazel b/src/workerd/api/node/tests/BUILD.bazel index ed9b7286b2c..b12b20afdeb 100644 --- a/src/workerd/api/node/tests/BUILD.bazel +++ b/src/workerd/api/node/tests/BUILD.bazel @@ -218,9 +218,9 @@ wd_test( wd_test( size = "large", - src = "zstd-nodejs-test.wd-test", + src = "zlib-zstd-nodejs-test.wd-test", args = ["--experimental"], - data = ["zstd-nodejs-test.js"], + data = ["zlib-zstd-nodejs-test.js"], ) wd_test( diff --git a/src/workerd/api/node/tests/zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js similarity index 92% rename from src/workerd/api/node/tests/zstd-nodejs-test.js rename to src/workerd/api/node/tests/zlib-zstd-nodejs-test.js index 33864af1d88..169fb640dd9 100644 --- a/src/workerd/api/node/tests/zstd-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -1,12 +1,7 @@ import assert from 'node:assert'; import { Buffer } from 'node:buffer'; -import { promisify } from 'node:util'; import zlib from 'node:zlib'; -// Helper function to promisify callbacks -const zstdCompressAsync = promisify(zlib.zstdCompress); -const zstdDecompressAsync = promisify(zlib.zstdDecompress); - // Basic sync compress/decompress test export const zstdBasicSyncTest = { test() { @@ -25,11 +20,26 @@ export const zstdBasicSyncTest = { export const zstdBasicAsyncTest = { async test() { const input = Buffer.from('Hello, async Zstd compression!'); - const compressed = await zstdCompressAsync(input); + + const { promise: compressPromise, resolve: compressResolve, reject: compressReject } = + Promise.withResolvers(); + zlib.zstdCompress(input, (err, res) => { + if (err) compressReject(err); + else compressResolve(res); + }); + const compressed = await compressPromise; + assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); assert(compressed.length > 0, 'Compressed output should not be empty'); - const decompressed = await zstdDecompressAsync(compressed); + const { promise: decompressPromise, resolve: decompressResolve, reject: decompressReject } = + Promise.withResolvers(); + zlib.zstdDecompress(compressed, (err, res) => { + if (err) decompressReject(err); + else decompressResolve(res); + }); + const decompressed = await decompressPromise; + assert(Buffer.isBuffer(decompressed), 'Decompressed output should be a buffer'); assert.strictEqual(decompressed.toString(), input.toString(), 'Round-trip should match'); }, @@ -237,8 +247,14 @@ export const zstdCallbackErrorTest = { async test() { const invalidData = Buffer.from('invalid zstd data'); + const { promise, resolve, reject } = Promise.withResolvers(); + zlib.zstdDecompress(invalidData, (err, res) => { + if (err) reject(err); + else resolve(res); + }); + try { - await zstdDecompressAsync(invalidData); + await promise; assert.fail('Should have thrown'); } catch (err) { assert(err instanceof Error, 'Should receive an error'); diff --git a/src/workerd/api/node/tests/zstd-nodejs-test.wd-test b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test similarity index 80% rename from src/workerd/api/node/tests/zstd-nodejs-test.wd-test rename to src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test index ee04da3d80b..9283590a92d 100644 --- a/src/workerd/api/node/tests/zstd-nodejs-test.wd-test +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test @@ -5,7 +5,7 @@ const unitTests :Workerd.Config = ( ( name = "nodejs-zstd-test", worker = ( modules = [ - (name = "worker", esModule = embed "zstd-nodejs-test.js") + (name = "worker", esModule = embed "zlib-zstd-nodejs-test.js") ], compatibilityFlags = ["experimental", "nodejs_compat", "nodejs_compat_v2", "nodejs_zlib"], ) From 08acff0d400828549db00ff46abae2282c2cf0c6 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:44:37 +0000 Subject: [PATCH 5/9] Add zstdCheckError helper function. --- src/workerd/api/node/zlib-util.c++ | 31 ++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 53d7a9c2095..b4027204c96 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -790,6 +790,19 @@ void ZstdContext::getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) con *availOut = output_.size - output_.pos; } +namespace { +// Helper to check ZSTD errors and return a CompressionError if present. +// Also sets the error code in the provided reference for later retrieval. +kj::Maybe zstdCheckError( + size_t result, ZSTD_ErrorCode& error, kj::StringPtr errorCode) { + if (ZSTD_isError(result)) { + error = ZSTD_getErrorCode(result); + return CompressionError(ZSTD_getErrorName(result), errorCode, -1); + } + return kj::none; +} +} // namespace + ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): ZstdContext(_mode), cctx_(ZSTD_createCCtx()) {} @@ -810,10 +823,8 @@ kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSi if (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN) { size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_, pledgedSrcSize); - if (ZSTD_isError(result)) { - error_ = ZSTD_getErrorCode(result); - return CompressionError(kj::str(ZSTD_getErrorName(result)), - kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { + return kj::mv(err); } } @@ -834,10 +845,8 @@ void ZstdEncoderContext::work() { kj::Maybe ZstdEncoderContext::resetStream() { if (cctx_ != nullptr) { size_t result = ZSTD_CCtx_reset(cctx_, ZSTD_reset_session_only); - if (ZSTD_isError(result)) { - error_ = ZSTD_getErrorCode(result); - return CompressionError(kj::str(ZSTD_getErrorName(result)), - kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { + return kj::mv(err); } } return kj::none; @@ -905,10 +914,8 @@ void ZstdDecoderContext::work() { kj::Maybe ZstdDecoderContext::resetStream() { if (dctx_ != nullptr) { size_t result = ZSTD_DCtx_reset(dctx_, ZSTD_reset_session_only); - if (ZSTD_isError(result)) { - error_ = ZSTD_getErrorCode(result); - return CompressionError(kj::str(ZSTD_getErrorName(result)), - kj::str("ERR_ZSTD_DECOMPRESSION_FAILED"), -1); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_DECOMPRESSION_FAILED"_kj)) { + return kj::mv(err); } } return kj::none; From 915c17a2caec024c280aa02f0fbbb1c33d758d99 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:45:39 +0000 Subject: [PATCH 6/9] Remove duplicating test --- .../api/node/tests/zlib-zstd-nodejs-test.js | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js index 169fb640dd9..ae8d246e71e 100644 --- a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -164,47 +164,6 @@ export const zstdEmptyInputTest = { }, }; -// Test constants are exported correctly -export const zstdConstantsTest = { - test() { - // Flush directives - assert.strictEqual(typeof zlib.constants.ZSTD_e_continue, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_e_flush, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_e_end, 'number'); - - // Compression parameters - assert.strictEqual(typeof zlib.constants.ZSTD_c_compressionLevel, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_windowLog, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_hashLog, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_chainLog, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_searchLog, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_minMatch, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_targetLength, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_c_strategy, 'number'); - - // Decompression parameters - assert.strictEqual(typeof zlib.constants.ZSTD_d_windowLogMax, 'number'); - - // Strategy constants - assert.strictEqual(typeof zlib.constants.ZSTD_fast, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_dfast, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_greedy, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_lazy, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_lazy2, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_btlazy2, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_btopt, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_btultra, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_btultra2, 'number'); - - // Default compression level - assert.strictEqual(typeof zlib.constants.ZSTD_CLEVEL_DEFAULT, 'number'); - - // Mode constants - assert.strictEqual(typeof zlib.constants.ZSTD_ENCODE, 'number'); - assert.strictEqual(typeof zlib.constants.ZSTD_DECODE, 'number'); - }, -}; - // Test invalid compressed data export const zstdInvalidDataTest = { test() { From 229531797be98b72f6c307e696ce1f38532a0266 Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Tue, 3 Feb 2026 16:52:47 +0000 Subject: [PATCH 7/9] Use smart pointer for zstd context. --- src/workerd/api/node/zlib-util.c++ | 55 +++++++++++++----------------- src/workerd/api/node/zlib-util.h | 8 ++--- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index b4027204c96..fc208b45dfe 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -801,28 +801,28 @@ kj::Maybe zstdCheckError( } return kj::none; } + +// Wrappers for ZSTD free functions that return void (for use with kj::disposeWith). +void zstdFreeCCtx(ZSTD_CCtx* cctx) { + ZSTD_freeCCtx(cctx); +} +void zstdFreeDCtx(ZSTD_DCtx* dctx) { + ZSTD_freeDCtx(dctx); +} } // namespace ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): - ZstdContext(_mode), - cctx_(ZSTD_createCCtx()) {} - -ZstdEncoderContext::~ZstdEncoderContext() { - if (cctx_ != nullptr) { - ZSTD_freeCCtx(cctx_); - cctx_ = nullptr; - } -} + ZstdContext(_mode), + cctx_(kj::disposeWith(ZSTD_createCCtx())) {} kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSize) { - - if (cctx_ == nullptr) { + if (cctx_.get() == nullptr) { return CompressionError( "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); } if (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN) { - size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_, pledgedSrcSize); + size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_.get(), pledgedSrcSize); KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { return kj::mv(err); } @@ -833,9 +833,9 @@ kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSi void ZstdEncoderContext::work() { JSG_REQUIRE(mode == ZlibMode::ZSTD_ENCODE, Error, "Mode should be ZSTD_ENCODE"_kj); - JSG_REQUIRE(cctx_ != nullptr, Error, "Zstd context should not be null"_kj); + JSG_REQUIRE(cctx_.get() != nullptr, Error, "Zstd context should not be null"_kj); - lastResult = ZSTD_compressStream2(cctx_, &output_, &input_, flush_); + lastResult = ZSTD_compressStream2(cctx_.get(), &output_, &input_, flush_); if (ZSTD_isError(lastResult)) { error_ = ZSTD_getErrorCode(lastResult); @@ -843,8 +843,8 @@ void ZstdEncoderContext::work() { } kj::Maybe ZstdEncoderContext::resetStream() { - if (cctx_ != nullptr) { - size_t result = ZSTD_CCtx_reset(cctx_, ZSTD_reset_session_only); + if (cctx_.get() != nullptr) { + size_t result = ZSTD_CCtx_reset(cctx_.get(), ZSTD_reset_session_only); KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { return kj::mv(err); } @@ -855,7 +855,7 @@ kj::Maybe ZstdEncoderContext::resetStream() { kj::Maybe ZstdEncoderContext::setParams(int key, int value) { KJ_DASSERT(key >= ZSTD_c_compressionLevel, "key must be a valid ZSTD_cParameter (first valid value is ZSTD_c_compressionLevel)"); - size_t result = ZSTD_CCtx_setParameter(cctx_, static_cast(key), value); + size_t result = ZSTD_CCtx_setParameter(cctx_.get(), static_cast(key), value); if (ZSTD_isError(result)) { return CompressionError( kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), @@ -880,19 +880,12 @@ kj::Maybe ZstdEncoderContext::getError() const { ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode): ZstdContext(_mode), - dctx_(ZSTD_createDCtx()) {} - -ZstdDecoderContext::~ZstdDecoderContext() { - if (dctx_ != nullptr) { - ZSTD_freeDCtx(dctx_); - dctx_ = nullptr; - } -} + dctx_(kj::disposeWith(ZSTD_createDCtx())) {} kj::Maybe ZstdDecoderContext::initialize() { // dctx_ is created in the constructor. It can only be nullptr if ZSTD_createDCtx() // failed due to memory allocation failure. - if (dctx_ == nullptr) { + if (dctx_.get() == nullptr) { return CompressionError( "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); } @@ -902,9 +895,9 @@ kj::Maybe ZstdDecoderContext::initialize() { void ZstdDecoderContext::work() { JSG_REQUIRE(mode == ZlibMode::ZSTD_DECODE, Error, "Mode should be ZSTD_DECODE"_kj); - JSG_REQUIRE(dctx_ != nullptr, Error, "Zstd context should not be null"_kj); + JSG_REQUIRE(dctx_.get() != nullptr, Error, "Zstd context should not be null"_kj); - lastResult = ZSTD_decompressStream(dctx_, &output_, &input_); + lastResult = ZSTD_decompressStream(dctx_.get(), &output_, &input_); if (ZSTD_isError(lastResult)) { error_ = ZSTD_getErrorCode(lastResult); @@ -912,8 +905,8 @@ void ZstdDecoderContext::work() { } kj::Maybe ZstdDecoderContext::resetStream() { - if (dctx_ != nullptr) { - size_t result = ZSTD_DCtx_reset(dctx_, ZSTD_reset_session_only); + if (dctx_.get() != nullptr) { + size_t result = ZSTD_DCtx_reset(dctx_.get(), ZSTD_reset_session_only); KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_DECOMPRESSION_FAILED"_kj)) { return kj::mv(err); } @@ -922,7 +915,7 @@ kj::Maybe ZstdDecoderContext::resetStream() { } kj::Maybe ZstdDecoderContext::setParams(int key, int value) { - size_t result = ZSTD_DCtx_setParameter(dctx_, static_cast(key), value); + size_t result = ZSTD_DCtx_setParameter(dctx_.get(), static_cast(key), value); if (ZSTD_isError(result)) { return CompressionError( kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index 425bc98c5f0..ecbdaa73948 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -323,7 +323,6 @@ class ZstdEncoderContext final: public ZstdContext { public: static const ZlibMode Mode = ZlibMode::ZSTD_ENCODE; explicit ZstdEncoderContext(ZlibMode _mode); - ~ZstdEncoderContext(); KJ_DISALLOW_COPY_AND_MOVE(ZstdEncoderContext); void work(); @@ -334,7 +333,7 @@ class ZstdEncoderContext final: public ZstdContext { private: size_t lastResult = 0; - ZSTD_CCtx* cctx_ = nullptr; + kj::Own cctx_; ZSTD_ErrorCode error_ = ZSTD_error_no_error; }; @@ -342,7 +341,6 @@ class ZstdDecoderContext final: public ZstdContext { public: static const ZlibMode Mode = ZlibMode::ZSTD_DECODE; explicit ZstdDecoderContext(ZlibMode _mode); - ~ZstdDecoderContext(); KJ_DISALLOW_COPY_AND_MOVE(ZstdDecoderContext); void work(); @@ -353,7 +351,7 @@ class ZstdDecoderContext final: public ZstdContext { private: size_t lastResult = 0; - ZSTD_DCtx* dctx_ = nullptr; + kj::Own dctx_; ZSTD_ErrorCode error_ = ZSTD_error_no_error; }; @@ -784,3 +782,5 @@ class ZlibUtil final: public jsg::Object { KJ_DECLARE_NON_POLYMORPHIC(BrotliEncoderStateStruct) KJ_DECLARE_NON_POLYMORPHIC(BrotliDecoderStateStruct) +KJ_DECLARE_NON_POLYMORPHIC(ZSTD_CCtx) +KJ_DECLARE_NON_POLYMORPHIC(ZSTD_DCtx) From e77c44a1783b42deecff22c40d02f2c3aea116fc Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Wed, 11 Feb 2026 21:47:12 +0000 Subject: [PATCH 8/9] Fix formatting. --- .../api/node/tests/zlib-zstd-nodejs-test.js | 105 ++++++++++++++---- src/workerd/api/node/zlib-util.c++ | 21 ++-- src/workerd/api/node/zlib-util.h | 4 +- 3 files changed, 95 insertions(+), 35 deletions(-) diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js index ae8d246e71e..64c353b18c7 100644 --- a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -11,8 +11,15 @@ export const zstdBasicSyncTest = { assert(compressed.length > 0, 'Compressed output should not be empty'); const decompressed = zlib.zstdDecompressSync(compressed); - assert(Buffer.isBuffer(decompressed), 'Decompressed output should be a buffer'); - assert.strictEqual(decompressed.toString(), input.toString(), 'Round-trip should match'); + assert( + Buffer.isBuffer(decompressed), + 'Decompressed output should be a buffer' + ); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Round-trip should match' + ); }, }; @@ -21,8 +28,11 @@ export const zstdBasicAsyncTest = { async test() { const input = Buffer.from('Hello, async Zstd compression!'); - const { promise: compressPromise, resolve: compressResolve, reject: compressReject } = - Promise.withResolvers(); + const { + promise: compressPromise, + resolve: compressResolve, + reject: compressReject, + } = Promise.withResolvers(); zlib.zstdCompress(input, (err, res) => { if (err) compressReject(err); else compressResolve(res); @@ -32,16 +42,26 @@ export const zstdBasicAsyncTest = { assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); assert(compressed.length > 0, 'Compressed output should not be empty'); - const { promise: decompressPromise, resolve: decompressResolve, reject: decompressReject } = - Promise.withResolvers(); + const { + promise: decompressPromise, + resolve: decompressResolve, + reject: decompressReject, + } = Promise.withResolvers(); zlib.zstdDecompress(compressed, (err, res) => { if (err) decompressReject(err); else decompressResolve(res); }); const decompressed = await decompressPromise; - assert(Buffer.isBuffer(decompressed), 'Decompressed output should be a buffer'); - assert.strictEqual(decompressed.toString(), input.toString(), 'Round-trip should match'); + assert( + Buffer.isBuffer(decompressed), + 'Decompressed output should be a buffer' + ); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Round-trip should match' + ); }, }; @@ -51,7 +71,11 @@ export const zstdStringInputTest = { const input = 'This is a string input for Zstd compression'; const compressed = zlib.zstdCompressSync(input); const decompressed = zlib.zstdDecompressSync(compressed); - assert.strictEqual(decompressed.toString(), input, 'String input round-trip should match'); + assert.strictEqual( + decompressed.toString(), + input, + 'String input round-trip should match' + ); }, }; @@ -65,7 +89,10 @@ export const zstdLargeDataTest = { } const compressed = zlib.zstdCompressSync(input); - assert(compressed.length < input.length, 'Compressed should be smaller than input'); + assert( + compressed.length < input.length, + 'Compressed should be smaller than input' + ); const decompressed = zlib.zstdDecompressSync(compressed); assert(input.equals(decompressed), 'Large data round-trip should match'); @@ -75,7 +102,9 @@ export const zstdLargeDataTest = { // Test with different compression levels export const zstdCompressionLevelsTest = { test() { - const input = Buffer.from('Test data for compression level testing'.repeat(100)); + const input = Buffer.from( + 'Test data for compression level testing'.repeat(100) + ); // Test compression level 1 (fastest) const compressedFast = zlib.zstdCompressSync(input, { @@ -91,8 +120,14 @@ export const zstdCompressionLevelsTest = { const decompressedFast = zlib.zstdDecompressSync(compressedFast); const decompressedBest = zlib.zstdDecompressSync(compressedBest); - assert(input.equals(decompressedFast), 'Fast compression should decompress correctly'); - assert(input.equals(decompressedBest), 'Best compression should decompress correctly'); + assert( + input.equals(decompressedFast), + 'Fast compression should decompress correctly' + ); + assert( + input.equals(decompressedBest), + 'Best compression should decompress correctly' + ); // Higher compression level should typically produce smaller output assert( @@ -160,7 +195,11 @@ export const zstdEmptyInputTest = { const input = Buffer.alloc(0); const compressed = zlib.zstdCompressSync(input); const decompressed = zlib.zstdDecompressSync(compressed); - assert.strictEqual(decompressed.length, 0, 'Empty input should produce empty output'); + assert.strictEqual( + decompressed.length, + 0, + 'Empty input should produce empty output' + ); }, }; @@ -181,8 +220,13 @@ export const zstdChunkSizeTest = { test() { const input = Buffer.from('Testing chunk size option'.repeat(100)); const compressed = zlib.zstdCompressSync(input, { chunkSize: 1024 }); - const decompressed = zlib.zstdDecompressSync(compressed, { chunkSize: 1024 }); - assert(input.equals(decompressed), 'Custom chunkSize should work correctly'); + const decompressed = zlib.zstdDecompressSync(compressed, { + chunkSize: 1024, + }); + assert( + input.equals(decompressed), + 'Custom chunkSize should work correctly' + ); }, }; @@ -245,7 +289,10 @@ export const zstdInfoOptionTest = { const result = zlib.zstdCompressSync(input, { info: true }); // When info is true, result should be an object with buffer and engine properties - assert(typeof result === 'object', 'Result should be an object when info is true'); + assert( + typeof result === 'object', + 'Result should be an object when info is true' + ); assert(result.buffer, 'Result should have a buffer property'); assert(result.engine, 'Result should have an engine property'); }, @@ -254,8 +301,16 @@ export const zstdInfoOptionTest = { // Test classes are exported export const zstdClassesExportedTest = { test() { - assert.strictEqual(typeof zlib.ZstdCompress, 'function', 'ZstdCompress should be exported'); - assert.strictEqual(typeof zlib.ZstdDecompress, 'function', 'ZstdDecompress should be exported'); + assert.strictEqual( + typeof zlib.ZstdCompress, + 'function', + 'ZstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.ZstdDecompress, + 'function', + 'ZstdDecompress should be exported' + ); assert.strictEqual( typeof zlib.createZstdCompress, 'function', @@ -282,7 +337,15 @@ export const zstdSyncFunctionsExportedTest = { 'function', 'zstdDecompressSync should be exported' ); - assert.strictEqual(typeof zlib.zstdCompress, 'function', 'zstdCompress should be exported'); - assert.strictEqual(typeof zlib.zstdDecompress, 'function', 'zstdDecompress should be exported'); + assert.strictEqual( + typeof zlib.zstdCompress, + 'function', + 'zstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.zstdDecompress, + 'function', + 'zstdDecompress should be exported' + ); }, }; diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index fc208b45dfe..919292e2d62 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -811,9 +811,9 @@ void zstdFreeDCtx(ZSTD_DCtx* dctx) { } } // namespace -ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode): - ZstdContext(_mode), - cctx_(kj::disposeWith(ZSTD_createCCtx())) {} +ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode) + : ZstdContext(_mode), + cctx_(kj::disposeWith(ZSTD_createCCtx())) {} kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSize) { if (cctx_.get() == nullptr) { @@ -857,8 +857,7 @@ kj::Maybe ZstdEncoderContext::setParams(int key, int value) { "key must be a valid ZSTD_cParameter (first valid value is ZSTD_c_compressionLevel)"); size_t result = ZSTD_CCtx_setParameter(cctx_.get(), static_cast(key), value); if (ZSTD_isError(result)) { - return CompressionError( - kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + return CompressionError(kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); } return kj::none; @@ -878,9 +877,9 @@ kj::Maybe ZstdEncoderContext::getError() const { return kj::none; } -ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode): - ZstdContext(_mode), - dctx_(kj::disposeWith(ZSTD_createDCtx())) {} +ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode) + : ZstdContext(_mode), + dctx_(kj::disposeWith(ZSTD_createDCtx())) {} kj::Maybe ZstdDecoderContext::initialize() { // dctx_ is created in the constructor. It can only be nullptr if ZSTD_createDCtx() @@ -917,8 +916,7 @@ kj::Maybe ZstdDecoderContext::resetStream() { kj::Maybe ZstdDecoderContext::setParams(int key, int value) { size_t result = ZSTD_DCtx_setParameter(dctx_.get(), static_cast(key), value); if (ZSTD_isError(result)) { - return CompressionError( - kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + return CompressionError(kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); } return kj::none; @@ -1162,8 +1160,7 @@ void ZlibUtil::brotliWithCallback( } template -kj::Array ZlibUtil::zstdSync( - jsg::Lock& js, InputSource data, ZstdContext::Options opts) { +kj::Array ZlibUtil::zstdSync(jsg::Lock& js, InputSource data, ZstdContext::Options opts) { Context ctx(Context::Mode); auto chunkSize = opts.chunkSize.orDefault(ZLIB_PERFORMANT_CHUNK_SIZE); diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index ecbdaa73948..af2a97b08ad 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -732,8 +732,8 @@ class ZlibUtil final: public jsg::Object { JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_minMatch, ZSTD_c_minMatch); JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_targetLength, ZSTD_c_targetLength); JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_strategy, ZSTD_c_strategy); - JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_enableLongDistanceMatching, - ZSTD_c_enableLongDistanceMatching); + JSG_STATIC_CONSTANT_NAMED( + CONST_ZSTD_c_enableLongDistanceMatching, ZSTD_c_enableLongDistanceMatching); JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmHashLog, ZSTD_c_ldmHashLog); JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmMinMatch, ZSTD_c_ldmMinMatch); JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmBucketSizeLog, ZSTD_c_ldmBucketSizeLog); From 395ce8ac651980c42d53a518be4abf11d7260f1a Mon Sep 17 00:00:00 2001 From: alistairjevans Date: Thu, 12 Feb 2026 16:02:37 +0000 Subject: [PATCH 9/9] Fix bug where large files with many chunks would stack-overflow with a sync callback --- src/node/internal/internal_zlib_base.ts | 4 ++- .../api/node/tests/zlib-zstd-nodejs-test.js | 26 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index d6f546f75b2..bfcd48e6745 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -880,7 +880,9 @@ export class Zstd extends ZlibBase { !handle.initialize( initParamsArray, _writeState, - processCallback.bind(handle), + () => { + queueMicrotask(processCallback.bind(handle)); + }, pledgedSrcSize ) ) { diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js index 64c353b18c7..1274715095b 100644 --- a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -349,3 +349,29 @@ export const zstdSyncFunctionsExportedTest = { ); }, }; + +// Test stream decompression with large output and small chunkSize to exercise +// the processCallback recursion path; verify we don't get a stack overflow. +export const zstdStreamLargeDecompressTest = { + async test() { + // 1MB of compressible data — compresses small, decompresses large. + // With chunkSize=64, this requires ~16,000 output chunks, which triggers + // deep recursion in processCallback if the callback is synchronous. + const input = Buffer.alloc(1024 * 1024, 'A'); + const compressed = zlib.zstdCompressSync(input); + + const decompress = zlib.createZstdDecompress({ chunkSize: 64 }); + let totalBytes = 0; + + decompress.end(compressed); + for await (const chunk of decompress) { + totalBytes += chunk.length; + } + + assert.strictEqual( + totalBytes, + input.length, + `Decompressed size should be ${input.length}, got ${totalBytes}` + ); + }, +};