diff --git a/build/deps/deps.jsonc b/build/deps/deps.jsonc index 8381bfe5976..aebc7eba028 100644 --- a/build/deps/deps.jsonc +++ b/build/deps/deps.jsonc @@ -23,6 +23,10 @@ "name": "brotli", "type": "bazel_dep" }, + { + "name": "zstd", + "type": "bazel_dep" + }, { "name": "tcmalloc", "type": "bazel_dep" diff --git a/build/deps/gen/deps.MODULE.bazel b/build/deps/gen/deps.MODULE.bazel index aa17edf7756..3c74788472c 100644 --- a/build/deps/gen/deps.MODULE.bazel +++ b/build/deps/gen/deps.MODULE.bazel @@ -145,3 +145,6 @@ git_override( ], remote = "https://chromium.googlesource.com/chromium/src/third_party/zlib.git", ) + +# zstd +bazel_dep(name = "zstd", version = "1.5.7.bcr.1") diff --git a/src/node/internal/internal_errors.ts b/src/node/internal/internal_errors.ts index a062b46eda6..554a8d228e7 100644 --- a/src/node/internal/internal_errors.ts +++ b/src/node/internal/internal_errors.ts @@ -556,6 +556,12 @@ export class ERR_BROTLI_INVALID_PARAM extends NodeRangeError { } } +export class ERR_ZSTD_INVALID_PARAM extends NodeRangeError { + constructor(value: unknown) { + super('ERR_ZSTD_INVALID_PARAM', `${value} is not a valid Zstd parameter`); + } +} + export class ERR_ZLIB_INITIALIZATION_FAILED extends NodeError { constructor() { super('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed'); diff --git a/src/node/internal/internal_zlib.ts b/src/node/internal/internal_zlib.ts index 1321a10b6f9..375261a45e6 100644 --- a/src/node/internal/internal_zlib.ts +++ b/src/node/internal/internal_zlib.ts @@ -7,11 +7,21 @@ import { default as zlibUtil, type ZlibOptions, type BrotliOptions, + type ZstdOptions, } from 'node-internal:zlib'; import { Buffer } from 'node-internal:internal_buffer'; import { validateUint32 } from 'node-internal:validators'; import { ERR_INVALID_ARG_TYPE } from 'node-internal:internal_errors'; -import { Zlib, Brotli, type ZlibBase } from 'node-internal:internal_zlib_base'; +import { + Zlib, + Brotli, + Zstd, + zstdInitCParamsArray, + zstdInitDParamsArray, + kMaxZstdCParam, + kMaxZstdDParam, + type ZlibBase, +} from 'node-internal:internal_zlib_base'; type ZlibResult = Buffer | { buffer: Buffer; engine: ZlibBase }; type CompressCallback = (err: Error | null, result?: ZlibResult) => void; @@ -26,6 +36,8 @@ const { CONST_UNZIP, CONST_BROTLI_DECODE, CONST_BROTLI_ENCODE, + CONST_ZSTD_ENCODE, + CONST_ZSTD_DECODE, } = zlibUtil; const ZlibMode = { @@ -328,6 +340,88 @@ export function brotliCompress( processChunkCaptureError(new BrotliCompress(options), data, callback); } + +export function zstdDecompressSync( + data: ArrayBufferView | string, + options: ZstdOptions = {} +): ZlibResult { + if (!options.info) { + // Fast path, where we send the data directly to C++ + return Buffer.from(zlibUtil.zstdDecompressSync(data, options)); + } + + // Else, use the Engine class in sync mode + return processChunk(new ZstdDecompress(options), data); +} + +export function zstdCompressSync( + data: ArrayBufferView | string, + options: ZstdOptions = {} +): ZlibResult { + if (!options.info) { + // Fast path, where we send the data directly to C++ + return Buffer.from(zlibUtil.zstdCompressSync(data, options)); + } + + // Else, use the Engine class in sync mode + return processChunk(new ZstdCompress(options), data); +} + +export function zstdDecompress( + data: ArrayBufferView | string, + optionsOrCallback: ZstdOptions | CompressCallback, + callbackOrUndefined?: CompressCallback +): void { + const [options, callback] = normalizeArgs( + optionsOrCallback, + callbackOrUndefined + ); + + if (!options.info) { + // Fast path + zlibUtil.zstdDecompress(data, options, (res) => { + queueMicrotask(() => { + if (res instanceof Error) { + callback(res); + } else { + callback(null, Buffer.from(res)); + } + }); + }); + + return; + } + + processChunkCaptureError(new ZstdDecompress(options), data, callback); +} + +export function zstdCompress( + data: ArrayBufferView | string, + optionsOrCallback: ZstdOptions | CompressCallback, + callbackOrUndefined?: CompressCallback +): void { + const [options, callback] = normalizeArgs( + optionsOrCallback, + callbackOrUndefined + ); + + if (!options.info) { + // Fast path + zlibUtil.zstdCompress(data, options, (res) => { + queueMicrotask(() => { + if (res instanceof Error) { + callback(res); + } else { + callback(null, Buffer.from(res)); + } + }); + }); + + return; + } + + processChunkCaptureError(new ZstdCompress(options), data, callback); +} export class Gzip extends Zlib { constructor(options: ZlibOptions) { super(options, CONST_GZIP); @@ -385,6 +479,18 @@ export class BrotliDecompress extends Brotli { } } +export class ZstdCompress extends Zstd { + constructor(options: ZstdOptions) { + super(options, CONST_ZSTD_ENCODE, zstdInitCParamsArray, kMaxZstdCParam); + } +} + +export class ZstdDecompress extends Zstd { + constructor(options: ZstdOptions) { + super(options, CONST_ZSTD_DECODE, zstdInitDParamsArray, kMaxZstdDParam); + } +} + const CLASS_BY_MODE: Record< (typeof ZlibMode)[keyof typeof ZlibMode], | typeof Deflate @@ -440,3 +546,11 @@ export function createBrotliDecompress( ): BrotliDecompress { return new BrotliDecompress(options); } + +export function createZstdCompress(options: ZstdOptions): ZstdCompress { + return new ZstdCompress(options); +} + +export function createZstdDecompress(options: ZstdOptions): ZstdDecompress { + return new ZstdDecompress(options); +} diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index 68e8adf7707..bfcd48e6745 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -7,6 +7,7 @@ import { default as zlibUtil, type ZlibOptions, type BrotliOptions, + type ZstdOptions, } from 'node-internal:zlib'; import { Buffer, kMaxLength } from 'node-internal:internal_buffer'; import { @@ -18,6 +19,7 @@ import { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_BROTLI_INVALID_PARAM, + ERR_ZSTD_INVALID_PARAM, ERR_ZLIB_INITIALIZATION_FAILED, NodeError, } from 'node-internal:internal_errors'; @@ -62,20 +64,29 @@ const { CONST_BROTLI_OPERATION_EMIT_METADATA, CONST_BROTLI_OPERATION_FINISH, CONST_BROTLI_OPERATION_FLUSH, + CONST_ZSTD_ENCODE, + CONST_ZSTD_DECODE, + CONST_ZSTD_e_continue, + CONST_ZSTD_e_end, + CONST_ZSTD_e_flush, } = zlibUtil; // This type contains all possible handler types. type ZlibHandleType = | zlibUtil.ZlibStream | zlibUtil.BrotliEncoder - | zlibUtil.BrotliDecoder; + | zlibUtil.BrotliDecoder + | zlibUtil.ZstdEncoder + | zlibUtil.ZstdDecoder; export const owner_symbol = Symbol('owner'); const FLUSH_BOUND_IDX_NORMAL: number = 0; const FLUSH_BOUND_IDX_BROTLI: number = 1; -const FLUSH_BOUND: [[number, number], [number, number]] = [ +const FLUSH_BOUND_IDX_ZSTD: number = 2; +const FLUSH_BOUND: [[number, number], [number, number], [number, number]] = [ [CONST_Z_NO_FLUSH, CONST_Z_BLOCK], [CONST_BROTLI_OPERATION_PROCESS, CONST_BROTLI_OPERATION_EMIT_METADATA], + [CONST_ZSTD_e_continue, CONST_ZSTD_e_end], ]; const kFlushFlag = Symbol('kFlushFlag'); @@ -369,10 +380,12 @@ export class ZlibBase extends Transform { let maxOutputLength = kMaxLength; let flushBoundIdx; - if (mode !== CONST_BROTLI_ENCODE && mode !== CONST_BROTLI_DECODE) { - flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; - } else { + if (mode === CONST_BROTLI_ENCODE || mode === CONST_BROTLI_DECODE) { flushBoundIdx = FLUSH_BOUND_IDX_BROTLI; + } else if (mode === CONST_ZSTD_ENCODE || mode === CONST_ZSTD_DECODE) { + flushBoundIdx = FLUSH_BOUND_IDX_ZSTD; + } else { + flushBoundIdx = FLUSH_BOUND_IDX_NORMAL; } /* eslint-disable-next-line @typescript-eslint/no-unnecessary-condition */ @@ -798,3 +811,85 @@ export class Brotli extends ZlibBase { this._writeState = _writeState; } } + +export const kMaxZstdCParam = Math.max( + ...Object.entries(constants).map(([key, value]) => + key.startsWith('ZSTD_c_') ? value : 0 + ) +); +export const zstdInitCParamsArray = new Int32Array(kMaxZstdCParam + 1); + +export const kMaxZstdDParam = Math.max( + ...Object.entries(constants).map(([key, value]) => + key.startsWith('ZSTD_d_') ? value : 0 + ) +); +export const zstdInitDParamsArray = new Int32Array(kMaxZstdDParam + 1); + +const zstdDefaultOptions: ZlibDefaultOptions = { + flush: CONST_ZSTD_e_continue, + finishFlush: CONST_ZSTD_e_end, + fullFlush: CONST_ZSTD_e_flush, +}; + +export class Zstd extends ZlibBase { + constructor( + options: ZstdOptions | undefined | null, + mode: number, + initParamsArray: Int32Array, + maxParam: number + ) { + ok(mode === CONST_ZSTD_DECODE || mode === CONST_ZSTD_ENCODE); + initParamsArray.fill(-1); + + if (options?.params) { + for (const [origKey, value] of Object.entries(options.params)) { + const key = +origKey; + if ( + Number.isNaN(key) || + key < 0 || + key > maxParam || + ((initParamsArray[key] as number) | 0) !== -1 + ) { + throw new ERR_ZSTD_INVALID_PARAM(origKey); + } + + if (typeof value !== 'number' && typeof value !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE( + 'options.params[key]', + 'number', + value + ); + } + // as number is required to avoid force type coercion on runtime. + // boolean has number representation, but typescript doesn't understand it. + initParamsArray[key] = value as number; + } + } + + const handle = + mode === CONST_ZSTD_DECODE + ? new zlibUtil.ZstdDecoder(mode) + : new zlibUtil.ZstdEncoder(mode); + + const _writeState = new Uint32Array(2); + + const pledgedSrcSize = options?.pledgedSrcSize; + + if ( + !handle.initialize( + initParamsArray, + _writeState, + () => { + queueMicrotask(processCallback.bind(handle)); + }, + pledgedSrcSize + ) + ) { + throw new ERR_ZLIB_INITIALIZATION_FAILED(); + } + + super(options ?? {}, mode, handle, zstdDefaultOptions); + this._writeState = _writeState; + } +} diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index f0e9e301867..3bbc60ed28c 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -36,6 +36,26 @@ export function brotliCompress( cb: InternalCompressCallback ): void; +export function zstdDecompressSync( + data: ArrayBufferView | string, + options: ZstdOptions +): ArrayBuffer; +export function zstdDecompress( + data: ArrayBufferView | string, + options: ZstdOptions, + cb: InternalCompressCallback +): void; + +export function zstdCompressSync( + data: ArrayBufferView | string, + options: ZstdOptions +): ArrayBuffer; +export function zstdCompress( + data: ArrayBufferView | string, + options: ZstdOptions, + cb: InternalCompressCallback +): void; + // zlib.constants (part of the API contract for node:zlib) export const CONST_Z_NO_FLUSH: number; export const CONST_Z_PARTIAL_FLUSH: number; @@ -74,6 +94,8 @@ export const CONST_INFLATERAW: number; export const CONST_UNZIP: number; export const CONST_BROTLI_DECODE: number; export const CONST_BROTLI_ENCODE: number; +export const CONST_ZSTD_ENCODE: number; +export const CONST_ZSTD_DECODE: number; export const CONST_Z_MIN_WINDOWBITS: number; export const CONST_Z_MAX_WINDOWBITS: number; @@ -150,6 +172,49 @@ export const CONST_BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: number; export const CONST_BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: number; export const CONST_BROTLI_DECODER_ERROR_UNREACHABLE: number; +// Zstd flush directives +export const CONST_ZSTD_e_continue: number; +export const CONST_ZSTD_e_flush: number; +export const CONST_ZSTD_e_end: number; + +// Zstd compression parameters +export const CONST_ZSTD_c_compressionLevel: number; +export const CONST_ZSTD_c_windowLog: number; +export const CONST_ZSTD_c_hashLog: number; +export const CONST_ZSTD_c_chainLog: number; +export const CONST_ZSTD_c_searchLog: number; +export const CONST_ZSTD_c_minMatch: number; +export const CONST_ZSTD_c_targetLength: number; +export const CONST_ZSTD_c_strategy: number; +export const CONST_ZSTD_c_enableLongDistanceMatching: number; +export const CONST_ZSTD_c_ldmHashLog: number; +export const CONST_ZSTD_c_ldmMinMatch: number; +export const CONST_ZSTD_c_ldmBucketSizeLog: number; +export const CONST_ZSTD_c_ldmHashRateLog: number; +export const CONST_ZSTD_c_contentSizeFlag: number; +export const CONST_ZSTD_c_checksumFlag: number; +export const CONST_ZSTD_c_dictIDFlag: number; +export const CONST_ZSTD_c_nbWorkers: number; +export const CONST_ZSTD_c_jobSize: number; +export const CONST_ZSTD_c_overlapLog: number; + +// Zstd decompression parameters +export const CONST_ZSTD_d_windowLogMax: number; + +// Zstd strategy constants +export const CONST_ZSTD_fast: number; +export const CONST_ZSTD_dfast: number; +export const CONST_ZSTD_greedy: number; +export const CONST_ZSTD_lazy: number; +export const CONST_ZSTD_lazy2: number; +export const CONST_ZSTD_btlazy2: number; +export const CONST_ZSTD_btopt: number; +export const CONST_ZSTD_btultra: number; +export const CONST_ZSTD_btultra2: number; + +// Zstd default compression level +export const CONST_ZSTD_CLEVEL_DEFAULT: number; + export interface ZlibOptions { flush?: number | undefined; finishFlush?: number | undefined; @@ -177,6 +242,21 @@ export interface BrotliOptions { info?: boolean | undefined; } +export interface ZstdOptions { + flush?: number | undefined; + finishFlush?: number | undefined; + chunkSize?: number | undefined; + params?: + | { + [key: number]: boolean | number; + } + | undefined; + maxOutputLength?: number | undefined; + pledgedSrcSize?: number | undefined; + // Not specified in NodeJS docs but the tests expect it + info?: boolean | undefined; +} + type ErrorHandler = (errno: number, code: string, message: string) => void; type ProcessHandler = () => void; @@ -246,3 +326,23 @@ export class BrotliEncoder extends CompressionStream { ): boolean; params(): void; } + +export class ZstdDecoder extends CompressionStream { + initialize( + params: Int32Array, + writeResult: Uint32Array, + writeCallback: () => void, + pledgedSrcSize?: number + ): boolean; + params(): void; +} + +export class ZstdEncoder extends CompressionStream { + initialize( + params: Int32Array, + writeResult: Uint32Array, + writeCallback: () => void, + pledgedSrcSize?: number + ): boolean; + params(): void; +} diff --git a/src/node/zlib.ts b/src/node/zlib.ts index 946bf339c79..f1c08ddcf80 100644 --- a/src/node/zlib.ts +++ b/src/node/zlib.ts @@ -46,6 +46,8 @@ const { UNZIP, BROTLI_DECODE, BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE, Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, @@ -141,6 +143,8 @@ const InflateRaw = protectMethod(zlib.InflateRaw); const Unzip = protectMethod(zlib.Unzip); const BrotliCompress = protectMethod(zlib.BrotliCompress); const BrotliDecompress = protectMethod(zlib.BrotliDecompress); +const ZstdCompress = protectMethod(zlib.ZstdCompress); +const ZstdDecompress = protectMethod(zlib.ZstdDecompress); const createGzip = protectMethod(zlib.createGzip); const createGunzip = protectMethod(zlib.createGunzip); @@ -151,6 +155,8 @@ const createInflateRaw = protectMethod(zlib.createInflateRaw); const createUnzip = protectMethod(zlib.createUnzip); const createBrotliCompress = protectMethod(zlib.createBrotliCompress); const createBrotliDecompress = protectMethod(zlib.createBrotliDecompress); +const createZstdCompress = protectMethod(zlib.createZstdCompress); +const createZstdDecompress = protectMethod(zlib.createZstdDecompress); const inflate = protectMethod(zlib.inflate); const inflateSync = protectMethod(zlib.inflateSync); @@ -170,6 +176,10 @@ const brotliCompress = protectMethod(zlib.brotliCompress); const brotliCompressSync = protectMethod(zlib.brotliCompressSync); const brotliDecompress = protectMethod(zlib.brotliDecompress); const brotliDecompressSync = protectMethod(zlib.brotliDecompressSync); +const zstdCompress = protectMethod(zlib.zstdCompress); +const zstdCompressSync = protectMethod(zlib.zstdCompressSync); +const zstdDecompress = protectMethod(zlib.zstdDecompress); +const zstdDecompressSync = protectMethod(zlib.zstdDecompressSync); export { crc32, @@ -186,6 +196,8 @@ export { Unzip, BrotliCompress, BrotliDecompress, + ZstdCompress, + ZstdDecompress, // Convenience methods to create classes createGzip, @@ -197,6 +209,8 @@ export { createUnzip, createBrotliCompress, createBrotliDecompress, + createZstdCompress, + createZstdDecompress, // One-shot methods inflate, @@ -217,6 +231,10 @@ export { brotliDecompressSync, brotliCompress, brotliCompressSync, + zstdCompress, + zstdCompressSync, + zstdDecompress, + zstdDecompressSync, // NodeJS also exports all constants directly under zlib, but this is deprecated Z_NO_FLUSH, @@ -253,6 +271,8 @@ export { UNZIP, BROTLI_DECODE, BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE, Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS, @@ -344,6 +364,8 @@ export default { Unzip, BrotliCompress, BrotliDecompress, + ZstdCompress, + ZstdDecompress, // Convenience methods to create classes createGzip, @@ -355,6 +377,8 @@ export default { createUnzip, createBrotliCompress, createBrotliDecompress, + createZstdCompress, + createZstdDecompress, // One-shot methods inflate, @@ -375,4 +399,8 @@ export default { brotliDecompressSync, brotliCompress, brotliCompressSync, + zstdCompress, + zstdCompressSync, + zstdDecompress, + zstdDecompressSync, }; diff --git a/src/workerd/api/node/BUILD.bazel b/src/workerd/api/node/BUILD.bazel index b756602cfef..3f3b3de96f1 100644 --- a/src/workerd/api/node/BUILD.bazel +++ b/src/workerd/api/node/BUILD.bazel @@ -52,6 +52,7 @@ wd_cc_library( "//src/workerd/util:mimetype", "@capnp-cpp//src/kj/compat:kj-brotli", "@ncrypto", + "@zstd", ], ) diff --git a/src/workerd/api/node/tests/BUILD.bazel b/src/workerd/api/node/tests/BUILD.bazel index 110a4f596e4..b12b20afdeb 100644 --- a/src/workerd/api/node/tests/BUILD.bazel +++ b/src/workerd/api/node/tests/BUILD.bazel @@ -216,6 +216,13 @@ wd_test( data = ["zlib-nodejs-test.js"], ) +wd_test( + size = "large", + src = "zlib-zstd-nodejs-test.wd-test", + args = ["--experimental"], + data = ["zlib-zstd-nodejs-test.js"], +) + wd_test( src = "module-nodejs-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index 1f8494ce3e8..5386213c810 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -333,6 +333,41 @@ export const constantsTest = { 'INFLATERAW', 'UNZIP', 'ZLIB_VERNUM', + 'ZSTD_CLEVEL_DEFAULT', + 'ZSTD_DECODE', + 'ZSTD_ENCODE', + 'ZSTD_btlazy2', + 'ZSTD_btopt', + 'ZSTD_btultra', + 'ZSTD_btultra2', + 'ZSTD_c_chainLog', + 'ZSTD_c_checksumFlag', + 'ZSTD_c_compressionLevel', + 'ZSTD_c_contentSizeFlag', + 'ZSTD_c_dictIDFlag', + 'ZSTD_c_enableLongDistanceMatching', + 'ZSTD_c_hashLog', + 'ZSTD_c_jobSize', + 'ZSTD_c_ldmBucketSizeLog', + 'ZSTD_c_ldmHashLog', + 'ZSTD_c_ldmHashRateLog', + 'ZSTD_c_ldmMinMatch', + 'ZSTD_c_minMatch', + 'ZSTD_c_nbWorkers', + 'ZSTD_c_overlapLog', + 'ZSTD_c_searchLog', + 'ZSTD_c_strategy', + 'ZSTD_c_targetLength', + 'ZSTD_c_windowLog', + 'ZSTD_d_windowLogMax', + 'ZSTD_dfast', + 'ZSTD_e_continue', + 'ZSTD_e_end', + 'ZSTD_e_flush', + 'ZSTD_fast', + 'ZSTD_greedy', + 'ZSTD_lazy', + 'ZSTD_lazy2', 'Z_BEST_COMPRESSION', 'Z_BEST_SPEED', 'Z_BLOCK', diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js new file mode 100644 index 00000000000..1274715095b --- /dev/null +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -0,0 +1,377 @@ +import assert from 'node:assert'; +import { Buffer } from 'node:buffer'; +import zlib from 'node:zlib'; + +// Basic sync compress/decompress test +export const zstdBasicSyncTest = { + test() { + const input = Buffer.from('Hello, Zstd compression!'); + const compressed = zlib.zstdCompressSync(input); + assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); + assert(compressed.length > 0, 'Compressed output should not be empty'); + + const decompressed = zlib.zstdDecompressSync(compressed); + assert( + Buffer.isBuffer(decompressed), + 'Decompressed output should be a buffer' + ); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Round-trip should match' + ); + }, +}; + +// Basic async compress/decompress test +export const zstdBasicAsyncTest = { + async test() { + const input = Buffer.from('Hello, async Zstd compression!'); + + const { + promise: compressPromise, + resolve: compressResolve, + reject: compressReject, + } = Promise.withResolvers(); + zlib.zstdCompress(input, (err, res) => { + if (err) compressReject(err); + else compressResolve(res); + }); + const compressed = await compressPromise; + + assert(Buffer.isBuffer(compressed), 'Compressed output should be a buffer'); + assert(compressed.length > 0, 'Compressed output should not be empty'); + + const { + promise: decompressPromise, + resolve: decompressResolve, + reject: decompressReject, + } = Promise.withResolvers(); + zlib.zstdDecompress(compressed, (err, res) => { + if (err) decompressReject(err); + else decompressResolve(res); + }); + const decompressed = await decompressPromise; + + assert( + Buffer.isBuffer(decompressed), + 'Decompressed output should be a buffer' + ); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Round-trip should match' + ); + }, +}; + +// Test with string input +export const zstdStringInputTest = { + test() { + const input = 'This is a string input for Zstd compression'; + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual( + decompressed.toString(), + input, + 'String input round-trip should match' + ); + }, +}; + +// Test with larger data +export const zstdLargeDataTest = { + test() { + // Create a 100KB buffer with repetitive content (compresses well) + const input = Buffer.alloc(100 * 1024); + for (let i = 0; i < input.length; i++) { + input[i] = i % 256; + } + + const compressed = zlib.zstdCompressSync(input); + assert( + compressed.length < input.length, + 'Compressed should be smaller than input' + ); + + const decompressed = zlib.zstdDecompressSync(compressed); + assert(input.equals(decompressed), 'Large data round-trip should match'); + }, +}; + +// Test with different compression levels +export const zstdCompressionLevelsTest = { + test() { + const input = Buffer.from( + 'Test data for compression level testing'.repeat(100) + ); + + // Test compression level 1 (fastest) + const compressedFast = zlib.zstdCompressSync(input, { + params: { [zlib.constants.ZSTD_c_compressionLevel]: 1 }, + }); + + // Test compression level 19 (best compression) + const compressedBest = zlib.zstdCompressSync(input, { + params: { [zlib.constants.ZSTD_c_compressionLevel]: 19 }, + }); + + // Both should decompress correctly + const decompressedFast = zlib.zstdDecompressSync(compressedFast); + const decompressedBest = zlib.zstdDecompressSync(compressedBest); + + assert( + input.equals(decompressedFast), + 'Fast compression should decompress correctly' + ); + assert( + input.equals(decompressedBest), + 'Best compression should decompress correctly' + ); + + // Higher compression level should typically produce smaller output + assert( + compressedBest.length <= compressedFast.length, + 'Higher compression level should produce smaller or equal output' + ); + }, +}; + +// Test with default compression level +export const zstdDefaultCompressionTest = { + test() { + const input = Buffer.from('Testing default compression level'); + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual( + decompressed.toString(), + input.toString(), + 'Default compression should work' + ); + }, +}; + +// Test stream API +export const zstdStreamTest = { + async test() { + const input = Buffer.from('Stream compression test data'.repeat(50)); + + // Create compress stream + const compress = zlib.createZstdCompress(); + const decompress = zlib.createZstdDecompress(); + + const chunks = []; + + // Pipe through compress + compress.on('data', (chunk) => chunks.push(chunk)); + + await new Promise((resolve, reject) => { + compress.on('end', resolve); + compress.on('error', reject); + compress.end(input); + }); + + const compressed = Buffer.concat(chunks); + assert(compressed.length > 0, 'Stream should produce output'); + + // Decompress + const decompressedChunks = []; + decompress.on('data', (chunk) => decompressedChunks.push(chunk)); + + await new Promise((resolve, reject) => { + decompress.on('end', resolve); + decompress.on('error', reject); + decompress.end(compressed); + }); + + const decompressed = Buffer.concat(decompressedChunks); + assert(input.equals(decompressed), 'Stream round-trip should match'); + }, +}; + +// Test empty input +export const zstdEmptyInputTest = { + test() { + const input = Buffer.alloc(0); + const compressed = zlib.zstdCompressSync(input); + const decompressed = zlib.zstdDecompressSync(compressed); + assert.strictEqual( + decompressed.length, + 0, + 'Empty input should produce empty output' + ); + }, +}; + +// Test invalid compressed data +export const zstdInvalidDataTest = { + test() { + const invalidData = Buffer.from('This is not valid zstd compressed data'); + assert.throws( + () => zlib.zstdDecompressSync(invalidData), + /Zstd decompression failed/, + 'Should throw on invalid compressed data' + ); + }, +}; + +// Test chunkSize option +export const zstdChunkSizeTest = { + test() { + const input = Buffer.from('Testing chunk size option'.repeat(100)); + const compressed = zlib.zstdCompressSync(input, { chunkSize: 1024 }); + const decompressed = zlib.zstdDecompressSync(compressed, { + chunkSize: 1024, + }); + assert( + input.equals(decompressed), + 'Custom chunkSize should work correctly' + ); + }, +}; + +// Test maxOutputLength option +export const zstdMaxOutputLengthTest = { + test() { + const input = Buffer.from('A'.repeat(1000)); + const compressed = zlib.zstdCompressSync(input); + + // Try to decompress with a maxOutputLength that's too small + assert.throws( + () => zlib.zstdDecompressSync(compressed, { maxOutputLength: 10 }), + /Memory limit exceeded/, + 'Should throw when maxOutputLength is exceeded' + ); + }, +}; + +// Test callback error handling +export const zstdCallbackErrorTest = { + async test() { + const invalidData = Buffer.from('invalid zstd data'); + + const { promise, resolve, reject } = Promise.withResolvers(); + zlib.zstdDecompress(invalidData, (err, res) => { + if (err) reject(err); + else resolve(res); + }); + + try { + await promise; + assert.fail('Should have thrown'); + } catch (err) { + assert(err instanceof Error, 'Should receive an error'); + } + }, +}; + +// Test with params for strategy +export const zstdStrategyTest = { + test() { + const input = Buffer.from('Testing compression strategy'.repeat(50)); + + // Test with ZSTD_fast strategy + const compressedFast = zlib.zstdCompressSync(input, { + params: { + [zlib.constants.ZSTD_c_strategy]: zlib.constants.ZSTD_fast, + }, + }); + + const decompressed = zlib.zstdDecompressSync(compressedFast); + assert(input.equals(decompressed), 'Strategy option should work correctly'); + }, +}; + +// Test with info option +export const zstdInfoOptionTest = { + test() { + const input = Buffer.from('Testing info option'); + const result = zlib.zstdCompressSync(input, { info: true }); + + // When info is true, result should be an object with buffer and engine properties + assert( + typeof result === 'object', + 'Result should be an object when info is true' + ); + assert(result.buffer, 'Result should have a buffer property'); + assert(result.engine, 'Result should have an engine property'); + }, +}; + +// Test classes are exported +export const zstdClassesExportedTest = { + test() { + assert.strictEqual( + typeof zlib.ZstdCompress, + 'function', + 'ZstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.ZstdDecompress, + 'function', + 'ZstdDecompress should be exported' + ); + assert.strictEqual( + typeof zlib.createZstdCompress, + 'function', + 'createZstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.createZstdDecompress, + 'function', + 'createZstdDecompress should be exported' + ); + }, +}; + +// Test sync functions are exported +export const zstdSyncFunctionsExportedTest = { + test() { + assert.strictEqual( + typeof zlib.zstdCompressSync, + 'function', + 'zstdCompressSync should be exported' + ); + assert.strictEqual( + typeof zlib.zstdDecompressSync, + 'function', + 'zstdDecompressSync should be exported' + ); + assert.strictEqual( + typeof zlib.zstdCompress, + 'function', + 'zstdCompress should be exported' + ); + assert.strictEqual( + typeof zlib.zstdDecompress, + 'function', + 'zstdDecompress should be exported' + ); + }, +}; + +// Test stream decompression with large output and small chunkSize to exercise +// the processCallback recursion path; verify we don't get a stack overflow. +export const zstdStreamLargeDecompressTest = { + async test() { + // 1MB of compressible data — compresses small, decompresses large. + // With chunkSize=64, this requires ~16,000 output chunks, which triggers + // deep recursion in processCallback if the callback is synchronous. + const input = Buffer.alloc(1024 * 1024, 'A'); + const compressed = zlib.zstdCompressSync(input); + + const decompress = zlib.createZstdDecompress({ chunkSize: 64 }); + let totalBytes = 0; + + decompress.end(compressed); + for await (const chunk of decompress) { + totalBytes += chunk.length; + } + + assert.strictEqual( + totalBytes, + input.length, + `Decompressed size should be ${input.length}, got ${totalBytes}` + ); + }, +}; diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test new file mode 100644 index 00000000000..9283590a92d --- /dev/null +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.wd-test @@ -0,0 +1,14 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "nodejs-zstd-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "zlib-zstd-nodejs-test.js") + ], + compatibilityFlags = ["experimental", "nodejs_compat", "nodejs_compat_v2", "nodejs_zlib"], + ) + ), + ], +); diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 95f85e44c6c..919292e2d62 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -755,6 +755,231 @@ kj::Maybe BrotliDecoderContext::getError() const { return kj::none; } +// ======================================================================================= +// Zstd Implementation + +void ZstdContext::setBuffers(kj::ArrayPtr input, kj::ArrayPtr output) { + setInputBuffer(input); + setOutputBuffer(output); +} + +void ZstdContext::setInputBuffer(kj::ArrayPtr input) { + input_.src = input.begin(); + input_.size = input.size(); + input_.pos = 0; +} + +void ZstdContext::setOutputBuffer(kj::ArrayPtr output) { + output_.dst = output.begin(); + output_.size = output.size(); + output_.pos = 0; +} + +void ZstdContext::setFlush(int flush) { + KJ_DASSERT(flush >= ZSTD_e_continue && flush <= ZSTD_e_end, + "flush must be a valid ZSTD_EndDirective value"); + flush_ = static_cast(flush); +} + +kj::uint ZstdContext::getAvailOut() const { + return output_.size - output_.pos; +} + +void ZstdContext::getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const { + *availIn = input_.size - input_.pos; + *availOut = output_.size - output_.pos; +} + +namespace { +// Helper to check ZSTD errors and return a CompressionError if present. +// Also sets the error code in the provided reference for later retrieval. +kj::Maybe zstdCheckError( + size_t result, ZSTD_ErrorCode& error, kj::StringPtr errorCode) { + if (ZSTD_isError(result)) { + error = ZSTD_getErrorCode(result); + return CompressionError(ZSTD_getErrorName(result), errorCode, -1); + } + return kj::none; +} + +// Wrappers for ZSTD free functions that return void (for use with kj::disposeWith). +void zstdFreeCCtx(ZSTD_CCtx* cctx) { + ZSTD_freeCCtx(cctx); +} +void zstdFreeDCtx(ZSTD_DCtx* dctx) { + ZSTD_freeDCtx(dctx); +} +} // namespace + +ZstdEncoderContext::ZstdEncoderContext(ZlibMode _mode) + : ZstdContext(_mode), + cctx_(kj::disposeWith(ZSTD_createCCtx())) {} + +kj::Maybe ZstdEncoderContext::initialize(uint64_t pledgedSrcSize) { + if (cctx_.get() == nullptr) { + return CompressionError( + "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); + } + + if (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN) { + size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_.get(), pledgedSrcSize); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { + return kj::mv(err); + } + } + + return kj::none; +} + +void ZstdEncoderContext::work() { + JSG_REQUIRE(mode == ZlibMode::ZSTD_ENCODE, Error, "Mode should be ZSTD_ENCODE"_kj); + JSG_REQUIRE(cctx_.get() != nullptr, Error, "Zstd context should not be null"_kj); + + lastResult = ZSTD_compressStream2(cctx_.get(), &output_, &input_, flush_); + + if (ZSTD_isError(lastResult)) { + error_ = ZSTD_getErrorCode(lastResult); + } +} + +kj::Maybe ZstdEncoderContext::resetStream() { + if (cctx_.get() != nullptr) { + size_t result = ZSTD_CCtx_reset(cctx_.get(), ZSTD_reset_session_only); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_COMPRESSION_FAILED"_kj)) { + return kj::mv(err); + } + } + return kj::none; +} + +kj::Maybe ZstdEncoderContext::setParams(int key, int value) { + KJ_DASSERT(key >= ZSTD_c_compressionLevel, + "key must be a valid ZSTD_cParameter (first valid value is ZSTD_c_compressionLevel)"); + size_t result = ZSTD_CCtx_setParameter(cctx_.get(), static_cast(key), value); + if (ZSTD_isError(result)) { + return CompressionError(kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); + } + return kj::none; +} + +kj::Maybe ZstdEncoderContext::getError() const { + if (error_ != ZSTD_error_no_error) { + return CompressionError(kj::str("Zstd compression failed: ", ZSTD_getErrorString(error_)), + kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); + } + + if (flush_ == ZSTD_e_end && lastResult != 0) { + // lastResult > 0 means more output is needed, which shouldn't happen at end + return CompressionError("Unexpected end of file"_kj, "Z_BUF_ERROR"_kj, Z_BUF_ERROR); + } + + return kj::none; +} + +ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode) + : ZstdContext(_mode), + dctx_(kj::disposeWith(ZSTD_createDCtx())) {} + +kj::Maybe ZstdDecoderContext::initialize() { + // dctx_ is created in the constructor. It can only be nullptr if ZSTD_createDCtx() + // failed due to memory allocation failure. + if (dctx_.get() == nullptr) { + return CompressionError( + "Could not initialize Zstd instance"_kj, "ERR_ZLIB_INITIALIZATION_FAILED"_kj, -1); + } + + return kj::none; +} + +void ZstdDecoderContext::work() { + JSG_REQUIRE(mode == ZlibMode::ZSTD_DECODE, Error, "Mode should be ZSTD_DECODE"_kj); + JSG_REQUIRE(dctx_.get() != nullptr, Error, "Zstd context should not be null"_kj); + + lastResult = ZSTD_decompressStream(dctx_.get(), &output_, &input_); + + if (ZSTD_isError(lastResult)) { + error_ = ZSTD_getErrorCode(lastResult); + } +} + +kj::Maybe ZstdDecoderContext::resetStream() { + if (dctx_.get() != nullptr) { + size_t result = ZSTD_DCtx_reset(dctx_.get(), ZSTD_reset_session_only); + KJ_IF_SOME(err, zstdCheckError(result, error_, "ERR_ZSTD_DECOMPRESSION_FAILED"_kj)) { + return kj::mv(err); + } + } + return kj::none; +} + +kj::Maybe ZstdDecoderContext::setParams(int key, int value) { + size_t result = ZSTD_DCtx_setParameter(dctx_.get(), static_cast(key), value); + if (ZSTD_isError(result)) { + return CompressionError(kj::str("Setting parameter failed: ", ZSTD_getErrorName(result)), + "ERR_ZSTD_PARAM_SET_FAILED"_kj, -1); + } + return kj::none; +} + +kj::Maybe ZstdDecoderContext::getError() const { + if (error_ != ZSTD_error_no_error) { + return CompressionError(kj::str("Zstd decompression failed: ", ZSTD_getErrorString(error_)), + kj::str("ERR_ZSTD_DECOMPRESSION_FAILED"), -1); + } + + // For decompression, lastResult == 0 means frame is complete + // If we have flush_ == ZSTD_e_end equivalent and there's input left, that's an error + if (flush_ == ZSTD_e_end && input_.pos < input_.size && lastResult == 0) { + // Frame completed but there's still input - could be trailing data + } + + return kj::none; +} + +template +jsg::Ref> ZlibUtil::ZstdCompressionStream< + CompressionContext>::constructor(jsg::Lock& js, ZlibModeValue mode) { + return js.alloc(static_cast(mode), js.getExternalMemoryTarget()); +} + +template +bool ZlibUtil::ZstdCompressionStream::initialize(jsg::Lock& js, + jsg::BufferSource params, + jsg::BufferSource writeResult, + jsg::Function writeCallback, + jsg::Optional pledgedSrcSize) { + this->initializeStream(kj::mv(writeResult), kj::mv(writeCallback)); + + uint64_t srcSize = pledgedSrcSize.orDefault(ZSTD_CONTENTSIZE_UNKNOWN); + + kj::Maybe maybeError; + if constexpr (CompressionContext::Mode == ZlibMode::ZSTD_ENCODE) { + maybeError = this->context()->initialize(srcSize); + } else { + maybeError = this->context()->initialize(); + } + + KJ_IF_SOME(err, maybeError) { + this->emitError(js, kj::mv(err)); + return false; + } + + auto results = params.template asArrayPtr(); + + for (size_t i = 0; i < results.size(); i++) { + if (results[i] == -1) { + continue; + } + + KJ_IF_SOME(err, this->context()->setParams(i, results[i])) { + this->emitError(js, kj::mv(err)); + return false; + } + } + return true; +} + template jsg::Ref> ZlibUtil::BrotliCompressionStream< CompressionContext>::constructor(jsg::Lock& js, ZlibModeValue mode) { @@ -934,6 +1159,71 @@ void ZlibUtil::brotliWithCallback( cb(js, kj::mv(res)); } +template +kj::Array ZlibUtil::zstdSync(jsg::Lock& js, InputSource data, ZstdContext::Options opts) { + Context ctx(Context::Mode); + + auto chunkSize = opts.chunkSize.orDefault(ZLIB_PERFORMANT_CHUNK_SIZE); + auto maxOutputLength = opts.maxOutputLength.orDefault(Z_MAX_CHUNK); + + JSG_REQUIRE(Z_MIN_CHUNK <= chunkSize && chunkSize <= Z_MAX_CHUNK, RangeError, + kj::str("The value of \"options.chunkSize\" is out of range. It must be >= ", Z_MIN_CHUNK, + " and <= ", Z_MAX_CHUNK, ". Received ", chunkSize)); + JSG_REQUIRE(maxOutputLength >= 1 && maxOutputLength <= Z_MAX_CHUNK, RangeError, + kj::str("The value of \"options.maxOutputLength\" is out of range. It must be >= 1 and <= ", + Z_MAX_CHUNK, ". Received ", maxOutputLength)); + GrowableBuffer result(ZLIB_PERFORMANT_CHUNK_SIZE, maxOutputLength); + + // Initialize the context + if constexpr (Context::Mode == ZlibMode::ZSTD_ENCODE) { + uint64_t pledgedSrcSize = opts.pledgedSrcSize.orDefault(ZSTD_CONTENTSIZE_UNKNOWN); + KJ_IF_SOME(err, ctx.initialize(pledgedSrcSize)) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } else { + KJ_IF_SOME(err, ctx.initialize()) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } + + // Set parameters + KJ_IF_SOME(params, opts.params) { + for (const auto& field: params.fields) { + KJ_IF_SOME(err, ctx.setParams(field.name.parseAs(), field.value)) { + JSG_FAIL_REQUIRE(Error, err.message); + } + } + } + + auto flush = opts.flush.orDefault(ZSTD_e_continue); + JSG_REQUIRE(ZSTD_e_continue <= flush && flush <= ZSTD_e_end, RangeError, + kj::str("The value of \"options.flush\" is out of range. It must be >= ", ZSTD_e_continue, + " and <= ", ZSTD_e_end, ". Received ", flush)); + + auto finishFlush = opts.finishFlush.orDefault(ZSTD_e_end); + JSG_REQUIRE(ZSTD_e_continue <= finishFlush && finishFlush <= ZSTD_e_end, RangeError, + kj::str("The value of \"options.finishFlush\" is out of range. It must be >= ", + ZSTD_e_continue, " and <= ", ZSTD_e_end, ". Received ", finishFlush)); + + ctx.setFlush(finishFlush); + ctx.setInputBuffer(getInputFromSource(data)); + return syncProcessBuffer(ctx, result); +} + +template +void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb) { + // Capture only relevant errors so they can be passed to the callback + auto res = js.tryCatch([&]() { + return CompressCallbackArg(zstdSync(js, kj::mv(data), kj::mv(options))); + }, [&](jsg::Value&& exception) { + return CompressCallbackArg(jsg::JsValue(exception.getHandle(js))); + }); + + // Ensure callback is invoked only once + cb(js, kj::mv(res)); +} + #ifndef CREATE_TEMPLATE #define CREATE_TEMPLATE(T) \ template class ZlibUtil::CompressionStream; \ @@ -947,10 +1237,15 @@ void ZlibUtil::brotliWithCallback( CREATE_TEMPLATE(ZlibContext) CREATE_TEMPLATE(BrotliEncoderContext) CREATE_TEMPLATE(BrotliDecoderContext) +CREATE_TEMPLATE(ZstdEncoderContext) +CREATE_TEMPLATE(ZstdDecoderContext) template class ZlibUtil::BrotliCompressionStream; template class ZlibUtil::BrotliCompressionStream; +template class ZlibUtil::ZstdCompressionStream; +template class ZlibUtil::ZstdCompressionStream; + template kj::Array ZlibUtil::brotliSync( jsg::Lock& js, InputSource data, BrotliContext::Options opts); template kj::Array ZlibUtil::brotliSync( @@ -959,6 +1254,15 @@ template void ZlibUtil::brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); template void ZlibUtil::brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); + +template kj::Array ZlibUtil::zstdSync( + jsg::Lock& js, InputSource data, ZstdContext::Options opts); +template kj::Array ZlibUtil::zstdSync( + jsg::Lock& js, InputSource data, ZstdContext::Options opts); +template void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); +template void ZlibUtil::zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); #undef CREATE_TEMPLATE #endif } // namespace workerd::api::node diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index ca5f2a63dbf..af2a97b08ad 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include @@ -71,7 +73,9 @@ enum class ZlibMode : ZlibModeValue { INFLATERAW, UNZIP, BROTLI_DECODE, - BROTLI_ENCODE + BROTLI_ENCODE, + ZSTD_ENCODE, + ZSTD_DECODE }; // When possible, we intentionally override chunkSize to a value that is likely to perform better @@ -283,6 +287,74 @@ class BrotliDecoderContext final: public BrotliContext { kj::Own state; }; +class ZstdContext { + public: + explicit ZstdContext(ZlibMode _mode): mode(_mode) {} + KJ_DISALLOW_COPY(ZstdContext); + + void setBuffers(kj::ArrayPtr input, kj::ArrayPtr output); + void setInputBuffer(kj::ArrayPtr input); + void setOutputBuffer(kj::ArrayPtr output); + void setFlush(int flush); + kj::uint getAvailOut() const; + void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const; + void setMode(ZlibMode _mode) { + mode = _mode; + } + + struct Options { + jsg::Optional flush; + jsg::Optional finishFlush; + jsg::Optional chunkSize; + jsg::Optional> params; + jsg::Optional maxOutputLength; + jsg::Optional pledgedSrcSize; + JSG_STRUCT(flush, finishFlush, chunkSize, params, maxOutputLength, pledgedSrcSize); + }; + + protected: + ZlibMode mode; + ZSTD_inBuffer input_{nullptr, 0, 0}; + ZSTD_outBuffer output_{nullptr, 0, 0}; + ZSTD_EndDirective flush_ = ZSTD_e_continue; +}; + +class ZstdEncoderContext final: public ZstdContext { + public: + static const ZlibMode Mode = ZlibMode::ZSTD_ENCODE; + explicit ZstdEncoderContext(ZlibMode _mode); + KJ_DISALLOW_COPY_AND_MOVE(ZstdEncoderContext); + + void work(); + kj::Maybe initialize(uint64_t pledgedSrcSize); + kj::Maybe resetStream(); + kj::Maybe setParams(int key, int value); + kj::Maybe getError() const; + + private: + size_t lastResult = 0; + kj::Own cctx_; + ZSTD_ErrorCode error_ = ZSTD_error_no_error; +}; + +class ZstdDecoderContext final: public ZstdContext { + public: + static const ZlibMode Mode = ZlibMode::ZSTD_DECODE; + explicit ZstdDecoderContext(ZlibMode _mode); + KJ_DISALLOW_COPY_AND_MOVE(ZstdDecoderContext); + + void work(); + kj::Maybe initialize(); + kj::Maybe resetStream(); + kj::Maybe setParams(int key, int value); + kj::Maybe getError() const; + + private: + size_t lastResult = 0; + kj::Own dctx_; + ZSTD_ErrorCode error_ = ZSTD_error_no_error; +}; + // Implements utilities in support of the Node.js Zlib class ZlibUtil final: public jsg::Object { public: @@ -417,6 +489,37 @@ class ZlibUtil final: public jsg::Object { } }; + template + class ZstdCompressionStream: public CompressionStream { + public: + explicit ZstdCompressionStream( + ZlibMode _mode, kj::Arc&& externalMemoryTarget) + : CompressionStream(_mode, kj::mv(externalMemoryTarget)) {} + KJ_DISALLOW_COPY_AND_MOVE(ZstdCompressionStream); + static jsg::Ref constructor(jsg::Lock& js, ZlibModeValue mode); + + bool initialize(jsg::Lock& js, + jsg::BufferSource params, + jsg::BufferSource writeResult, + jsg::Function writeCallback, + jsg::Optional pledgedSrcSize); + + void params() { + // Currently a no-op, and not accessed from JS land. + } + + JSG_RESOURCE_TYPE(ZstdCompressionStream) { + JSG_INHERIT(CompressionStream); + + JSG_METHOD(initialize); + JSG_METHOD(params); + } + + CompressionContext* context() { + return this->CompressionStream::context(); + } + }; + using InputSource = kj::OneOf, kj::Array>; using CompressCallbackArg = kj::OneOf>; using CompressCallback = jsg::Function; @@ -436,6 +539,12 @@ class ZlibUtil final: public jsg::Object { void brotliWithCallback( jsg::Lock& js, InputSource data, BrotliContext::Options options, CompressCallback cb); + template + kj::Array zstdSync(jsg::Lock& js, InputSource data, ZstdContext::Options options); + template + void zstdWithCallback( + jsg::Lock& js, InputSource data, ZstdContext::Options options, CompressCallback cb); + JSG_RESOURCE_TYPE(ZlibUtil) { JSG_METHOD_NAMED(crc32, crc32Sync); JSG_METHOD(zlibSync); @@ -446,9 +555,16 @@ class ZlibUtil final: public jsg::Object { JSG_METHOD_NAMED(brotliDecompress, template brotliWithCallback); JSG_METHOD_NAMED(brotliCompress, template brotliWithCallback); + JSG_METHOD_NAMED(zstdDecompressSync, template zstdSync); + JSG_METHOD_NAMED(zstdCompressSync, template zstdSync); + JSG_METHOD_NAMED(zstdDecompress, template zstdWithCallback); + JSG_METHOD_NAMED(zstdCompress, template zstdWithCallback); + JSG_NESTED_TYPE(ZlibStream); JSG_NESTED_TYPE_NAMED(BrotliCompressionStream, BrotliEncoder); JSG_NESTED_TYPE_NAMED(BrotliCompressionStream, BrotliDecoder); + JSG_NESTED_TYPE_NAMED(ZstdCompressionStream, ZstdEncoder); + JSG_NESTED_TYPE_NAMED(ZstdCompressionStream, ZstdDecoder); // zlib.constants (part of the API contract for node:zlib) JSG_STATIC_CONSTANT_NAMED(CONST_Z_NO_FLUSH, Z_NO_FLUSH); @@ -597,6 +713,54 @@ class ZlibUtil final: public jsg::Object { BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES); JSG_STATIC_CONSTANT_NAMED( CONST_BROTLI_DECODER_ERROR_UNREACHABLE, BROTLI_DECODER_ERROR_UNREACHABLE); + + // Zstd mode constants + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_ENCODE, static_cast(ZlibMode::ZSTD_ENCODE)); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_DECODE, static_cast(ZlibMode::ZSTD_DECODE)); + + // Zstd flush directives + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_continue, ZSTD_e_continue); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_flush, ZSTD_e_flush); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_e_end, ZSTD_e_end); + + // Zstd compression parameters + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_compressionLevel, ZSTD_c_compressionLevel); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_windowLog, ZSTD_c_windowLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_hashLog, ZSTD_c_hashLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_chainLog, ZSTD_c_chainLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_searchLog, ZSTD_c_searchLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_minMatch, ZSTD_c_minMatch); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_targetLength, ZSTD_c_targetLength); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_strategy, ZSTD_c_strategy); + JSG_STATIC_CONSTANT_NAMED( + CONST_ZSTD_c_enableLongDistanceMatching, ZSTD_c_enableLongDistanceMatching); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmHashLog, ZSTD_c_ldmHashLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmMinMatch, ZSTD_c_ldmMinMatch); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmBucketSizeLog, ZSTD_c_ldmBucketSizeLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_ldmHashRateLog, ZSTD_c_ldmHashRateLog); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_contentSizeFlag, ZSTD_c_contentSizeFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_checksumFlag, ZSTD_c_checksumFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_dictIDFlag, ZSTD_c_dictIDFlag); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_nbWorkers, ZSTD_c_nbWorkers); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_jobSize, ZSTD_c_jobSize); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_c_overlapLog, ZSTD_c_overlapLog); + + // Zstd decompression parameters + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_d_windowLogMax, ZSTD_d_windowLogMax); + + // Zstd strategy constants + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_fast, ZSTD_fast); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_dfast, ZSTD_dfast); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_greedy, ZSTD_greedy); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_lazy, ZSTD_lazy); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_lazy2, ZSTD_lazy2); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btlazy2, ZSTD_btlazy2); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btopt, ZSTD_btopt); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btultra, ZSTD_btultra); + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_btultra2, ZSTD_btultra2); + + // Zstd default compression level + JSG_STATIC_CONSTANT_NAMED(CONST_ZSTD_CLEVEL_DEFAULT, ZSTD_CLEVEL_DEFAULT); } }; @@ -604,12 +768,19 @@ class ZlibUtil final: public jsg::Object { api::node::ZlibUtil, api::node::ZlibUtil::ZlibStream, \ api::node::ZlibUtil::BrotliCompressionStream, \ api::node::ZlibUtil::BrotliCompressionStream, \ + api::node::ZlibUtil::ZstdCompressionStream, \ + api::node::ZlibUtil::ZstdCompressionStream, \ api::node::ZlibUtil::CompressionStream, \ api::node::ZlibUtil::CompressionStream, \ api::node::ZlibUtil::CompressionStream, \ - api::node::ZlibContext::Options, api::node::BrotliContext::Options + api::node::ZlibUtil::CompressionStream, \ + api::node::ZlibUtil::CompressionStream, \ + api::node::ZlibContext::Options, api::node::BrotliContext::Options, \ + api::node::ZstdContext::Options } // namespace workerd::api::node KJ_DECLARE_NON_POLYMORPHIC(BrotliEncoderStateStruct) KJ_DECLARE_NON_POLYMORPHIC(BrotliDecoderStateStruct) +KJ_DECLARE_NON_POLYMORPHIC(ZSTD_CCtx) +KJ_DECLARE_NON_POLYMORPHIC(ZSTD_DCtx)