From 3262e8a64cc193702a17ce4a4adf659c8c2f6c56 Mon Sep 17 00:00:00 2001 From: Sujal Salekar Date: Sat, 17 May 2025 21:44:35 +0000 Subject: [PATCH] Implement Critical Gateway Improvements --- src/index.js | 40 +++--- src/middleware/index.js | 8 +- src/middleware/withContentClaimsDagula.js | 135 ++++++++++++++---- src/middleware/withMemoryBudget.js | 139 +++++++++++++++++++ src/middleware/withRangeCache.js | 162 ++++++++++++++++++++++ 5 files changed, 439 insertions(+), 45 deletions(-) create mode 100644 src/middleware/withMemoryBudget.js create mode 100644 src/middleware/withRangeCache.js diff --git a/src/index.js b/src/index.js index 5754fbd..ab5bf29 100644 --- a/src/index.js +++ b/src/index.js @@ -24,7 +24,9 @@ import { withEgressTracker, withAuthorizedSpace, withLocator, - withDelegationStubs + withDelegationStubs, + withMemoryBudget, + withRangeCache } from './middleware/index.js' import { instrument } from '@microlabs/otel-cf-workers' import { NoopSpanProcessor } from '@opentelemetry/sdk-trace-base' @@ -47,32 +49,34 @@ const handler = { fetch (request, env, ctx) { console.log(request.method, request.url) const middleware = composeMiddleware( - // Prepare the Context - withCdnCache, + // Basic context and error handling withContext, + withErrorHandler, withCorsHeaders, withVersionHeader, - withErrorHandler, + + // Security and resource management + withMemoryBudget, // Add memory budgeting early + withRateLimit, // Rate limiting before expensive operations + withAuthToken, // Authentication + withAuthorizedSpace, // Authorization + + // Request parsing and validation withParsedIpfsUrl, createWithHttpMethod('GET', 'HEAD'), - withAuthToken, + + // Caching and performance + withCdnCache, + withRangeCache, // Range request handling + + // Data fetching and processing withLocator, withDelegationStubs, - - // Rate-limit requests - withRateLimit, - - // Track egress bytes - withEgressTracker, - - // Fetch data withCarBlockHandler, - withAuthorizedSpace, withContentClaimsDagula, - withFormatRawHandler, - withFormatCarHandler, - - // Prepare the Response + withEgressTracker, + + // Response preparation withContentDispositionHeader, withFixedLengthStream ) diff --git a/src/middleware/index.js b/src/middleware/index.js index 78fd617..ba82ab1 100644 --- a/src/middleware/index.js +++ b/src/middleware/index.js @@ -1,9 +1,11 @@ +export { withContentClaimsDagula } from './withContentClaimsDagula.js' +export { withVersionHeader } from './withVersionHeader.js' export { withAuthToken } from './withAuthToken.js' export { withCarBlockHandler } from './withCarBlockHandler.js' -export { withContentClaimsDagula } from './withContentClaimsDagula.js' export { withRateLimit } from './withRateLimit.js' -export { withVersionHeader } from './withVersionHeader.js' +export { withEgressTracker } from './withEgressTracker.js' export { withAuthorizedSpace } from './withAuthorizedSpace.js' export { withLocator } from './withLocator.js' -export { withEgressTracker } from './withEgressTracker.js' export { withDelegationStubs } from './withDelegationStubs.js' +export { withMemoryBudget } from './withMemoryBudget.js' +export { withRangeCache } from './withRangeCache.js' diff --git a/src/middleware/withContentClaimsDagula.js b/src/middleware/withContentClaimsDagula.js index 912d0d9..6302d16 100644 --- a/src/middleware/withContentClaimsDagula.js +++ b/src/middleware/withContentClaimsDagula.js @@ -1,5 +1,7 @@ import { Dagula } from 'dagula' import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching' +import { HttpError } from 'http-errors' +import { ContentClaimsLocator } from '@web3-storage/gateway-lib' /** * @import { @@ -13,35 +15,120 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching' * @import { Environment } from './withContentClaimsDagula.types.js' */ +/** + * Error categories for content claims + */ +const ERROR_TYPES = { + NOT_FOUND: 'NotFound', + UNAUTHORIZED: 'Unauthorized', + RATE_LIMITED: 'RateLimited', + VALIDATION_ERROR: 'ValidationError', + INTERNAL_ERROR: 'InternalError' +} + +/** + * Sanitizes error messages for external consumption + * @param {Error} error - The original error + * @returns {string} - Sanitized error message + */ +function sanitizeErrorMessage(error) { + // Remove any internal paths or sensitive info + return error.message.replace(/\/internal\/.*\//, '[path]/') + .replace(/key=[\w-]+/, 'key=[redacted]') +} + /** * Creates a dagula instance backed by content claims. * - * @type {( - * Middleware< - * BlockContext & DagContext & UnixfsContext & IpfsUrlContext & LocatorContext, - * IpfsUrlContext & LocatorContext, - * Environment - * > - * )} + * @type {import('@web3-storage/gateway-lib').Middleware} */ -export function withContentClaimsDagula (handler) { +export function withContentClaimsDagula(handler) { return async (request, env, ctx) => { - const { locator } = ctx - const fetcher = BatchingFetcher.create(locator) - const dagula = new Dagula({ - async get (cid) { - const res = await fetcher.fetch(cid.multihash) - return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined - }, - async stream (cid, options) { - const res = await fetcher.fetch(cid.multihash, options) - return res.ok ? res.ok.stream() : undefined - }, - async stat (cid) { - const res = await locator.locate(cid.multihash) - return res.ok ? { size: res.ok.site[0].range.length } : undefined + const { dataCid } = ctx + + try { + const locator = ContentClaimsLocator.create({ + serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined, + retryOptions: { + retries: 3, + minTimeout: 1000, + maxTimeout: 5000 + } + }) + + const locRes = await locator.locate(dataCid.multihash) + + if (locRes.error) { + const errorType = locRes.error.name || ERROR_TYPES.INTERNAL_ERROR + console.error(`Content claims error: ${errorType}`, { + cid: dataCid.toString(), + error: locRes.error + }) + + switch (errorType) { + case ERROR_TYPES.NOT_FOUND: + throw new HttpError('Not Found', { status: 404 }) + case ERROR_TYPES.UNAUTHORIZED: + throw new HttpError('Unauthorized', { status: 401 }) + case ERROR_TYPES.RATE_LIMITED: + throw new HttpError('Too Many Requests', { status: 429 }) + case ERROR_TYPES.VALIDATION_ERROR: + throw new HttpError('Bad Request', { status: 400 }) + default: + // Log the full error internally but return a sanitized message + console.error('Internal content claims error:', locRes.error) + throw new HttpError('Internal Server Error', { status: 500 }) + } } - }) - return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula }) + + const fetcher = BatchingFetcher.create(locator) + const dagula = new Dagula({ + async get(cid) { + try { + const res = await fetcher.fetch(cid.multihash) + return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined + } catch (error) { + console.error('Block fetch error:', { + cid: cid.toString(), + error: sanitizeErrorMessage(error) + }) + throw new HttpError('Failed to fetch block', { status: 502 }) + } + }, + async stream(cid, options) { + try { + const res = await fetcher.fetch(cid.multihash, options) + return res.ok ? res.ok.stream() : undefined + } catch (error) { + console.error('Stream error:', { + cid: cid.toString(), + error: sanitizeErrorMessage(error) + }) + throw new HttpError('Failed to stream content', { status: 502 }) + } + }, + async stat(cid) { + try { + const res = await locator.locate(cid.multihash) + return res.ok ? { size: res.ok.site[0].range.length } : undefined + } catch (error) { + console.error('Stat error:', { + cid: cid.toString(), + error: sanitizeErrorMessage(error) + }) + throw new HttpError('Failed to get content stats', { status: 502 }) + } + } + }) + + return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula }) + } catch (error) { + // Catch any unhandled errors + console.error('Unhandled content claims error:', { + cid: dataCid.toString(), + error: sanitizeErrorMessage(error) + }) + throw new HttpError('Service Unavailable', { status: 503 }) + } } } diff --git a/src/middleware/withMemoryBudget.js b/src/middleware/withMemoryBudget.js new file mode 100644 index 0000000..af1b2dd --- /dev/null +++ b/src/middleware/withMemoryBudget.js @@ -0,0 +1,139 @@ +import { HttpError } from '@web3-storage/gateway-lib/util' + +/** + * Default memory limits in bytes + */ +const DEFAULT_MEMORY_LIMITS = { + MAX_BLOCK_SIZE: 1024 * 1024 * 10, // 10MB max block size + MAX_BATCH_SIZE: 1024 * 1024 * 50, // 50MB max batch size + MAX_CONCURRENT_BLOCKS: 100, // Maximum number of blocks to process concurrently + MEMORY_THRESHOLD: 0.8 // 80% memory threshold before throttling +} + +/** + * Memory budget tracking for block operations + */ +class MemoryBudget { + constructor(limits = DEFAULT_MEMORY_LIMITS) { + this.limits = limits + this.currentMemoryUsage = 0 + this.activeBlocks = new Set() + } + + /** + * Check if operation would exceed memory budget + * @param {number} size - Size of the operation in bytes + * @returns {boolean} + */ + wouldExceedBudget(size) { + return ( + size > this.limits.MAX_BLOCK_SIZE || + this.currentMemoryUsage + size > this.limits.MAX_BATCH_SIZE || + this.activeBlocks.size >= this.limits.MAX_CONCURRENT_BLOCKS + ) + } + + /** + * Track memory usage for a block operation + * @param {string} blockId - Unique identifier for the block + * @param {number} size - Size in bytes + */ + trackBlock(blockId, size) { + if (this.wouldExceedBudget(size)) { + throw new HttpError('Memory budget exceeded', { status: 413 }) + } + this.currentMemoryUsage += size + this.activeBlocks.add(blockId) + } + + /** + * Release memory for a block operation + * @param {string} blockId - Block identifier to release + * @param {number} size - Size in bytes to release + */ + releaseBlock(blockId, size) { + this.currentMemoryUsage = Math.max(0, this.currentMemoryUsage - size) + this.activeBlocks.delete(blockId) + } + + /** + * Get current memory usage statistics + * @returns {Object} Memory usage stats + */ + getStats() { + return { + currentUsage: this.currentMemoryUsage, + activeBlocks: this.activeBlocks.size, + isThrottled: this.currentMemoryUsage > (this.limits.MAX_BATCH_SIZE * this.limits.MEMORY_THRESHOLD) + } + } +} + +/** + * Middleware to manage memory budget for block operations + * @type {import('@web3-storage/gateway-lib').Middleware} + */ +export function withMemoryBudget(handler) { + return async (request, env, ctx) => { + const memoryBudget = new MemoryBudget() + + // Wrap the context with memory budgeting + const budgetedCtx = { + ...ctx, + memoryBudget, + blocks: { + ...ctx.blocks, + get: async (cid) => { + const blockId = cid.toString() + try { + const block = await ctx.blocks.get(cid) + if (block) { + memoryBudget.trackBlock(blockId, block.bytes.length) + } + return block + } catch (error) { + console.error('Block fetch error:', { blockId, error }) + throw error + } + }, + stream: async (cid, options) => { + const blockId = cid.toString() + const stream = await ctx.blocks.stream(cid, options) + + if (!stream) return stream + + // Wrap the stream to track memory usage + return new TransformStream({ + start(controller) { + memoryBudget.trackBlock(blockId, 0) + }, + transform(chunk, controller) { + memoryBudget.trackBlock(blockId, chunk.length) + controller.enqueue(chunk) + }, + flush(controller) { + memoryBudget.releaseBlock(blockId) + } + }) + } + } + } + + try { + const response = await handler(request, env, budgetedCtx) + + // Add memory usage stats to response headers + const stats = memoryBudget.getStats() + response.headers.set('X-Memory-Usage', stats.currentUsage.toString()) + response.headers.set('X-Active-Blocks', stats.activeBlocks.toString()) + + return response + } catch (error) { + if (error instanceof HttpError && error.status === 413) { + // Memory budget exceeded + console.warn('Memory budget exceeded:', memoryBudget.getStats()) + } + throw error + } + } +} \ No newline at end of file diff --git a/src/middleware/withRangeCache.js b/src/middleware/withRangeCache.js new file mode 100644 index 0000000..86415b8 --- /dev/null +++ b/src/middleware/withRangeCache.js @@ -0,0 +1,162 @@ +import { HttpError } from '@web3-storage/gateway-lib/util' +import { parseRange } from 'http-range-parse' + +/** + * Cache configuration for range requests + */ +const RANGE_CACHE_CONFIG = { + // Cache segments in 1MB chunks for efficient range handling + CHUNK_SIZE: 1024 * 1024, + // Maximum number of chunks to cache per file + MAX_CHUNKS: 100, + // TTL for cached chunks (1 hour) + CHUNK_TTL: 3600, + // Popular ranges to pre-cache (e.g., video preview segments) + POPULAR_RANGES: [ + { start: 0, end: 1024 * 1024 }, // First 1MB + { start: 0, end: 1024 * 512 } // First 512KB + ] +} + +/** + * Generates cache keys for range segments + * @param {string} resourceId - Resource identifier (e.g., CID) + * @param {number} chunkIndex - Index of the chunk + * @returns {string} Cache key + */ +function getRangeCacheKey(resourceId, chunkIndex) { + return `range:${resourceId}:chunk:${chunkIndex}` +} + +/** + * Calculates chunk indices for a given range + * @param {number} start - Range start + * @param {number} end - Range end + * @returns {number[]} Array of chunk indices + */ +function getChunkIndices(start, end) { + const startChunk = Math.floor(start / RANGE_CACHE_CONFIG.CHUNK_SIZE) + const endChunk = Math.floor(end / RANGE_CACHE_CONFIG.CHUNK_SIZE) + return Array.from( + { length: endChunk - startChunk + 1 }, + (_, i) => startChunk + i + ) +} + +/** + * Middleware for handling range requests with efficient caching + * @type {import('@web3-storage/gateway-lib').Middleware} + */ +export function withRangeCache(handler) { + return async (request, env, ctx) => { + const rangeHeader = request.headers.get('Range') + if (!rangeHeader) { + return handler(request, env, ctx) + } + + try { + const { dataCid } = ctx + const resourceId = dataCid.toString() + + // Parse range header + const ranges = parseRange(rangeHeader) + if (!ranges || ranges.length === 0) { + throw new HttpError('Invalid Range header', { status: 416 }) + } + + // Get total size first + const stats = await ctx.blocks.stat(dataCid) + if (!stats || !stats.size) { + throw new HttpError('Unable to determine content size', { status: 500 }) + } + + const totalSize = stats.size + const range = ranges[0] // Handle first range for now + const end = range.end === undefined ? totalSize - 1 : Math.min(range.end, totalSize - 1) + const start = Math.min(range.start, end) + + if (start >= totalSize) { + throw new HttpError('Range Not Satisfiable', { status: 416 }) + } + + // Calculate required chunks + const chunks = getChunkIndices(start, end) + const cachedChunks = new Map() + + // Try to get cached chunks + await Promise.all( + chunks.map(async (chunkIndex) => { + const cacheKey = getRangeCacheKey(resourceId, chunkIndex) + const cached = await env.RANGE_CACHE.get(cacheKey) + if (cached) { + cachedChunks.set(chunkIndex, cached) + } + }) + ) + + // If we have all chunks cached, construct response from cache + if (cachedChunks.size === chunks.length) { + const contentLength = end - start + 1 + const headers = new Headers({ + 'Content-Range': `bytes ${start}-${end}/${totalSize}`, + 'Content-Length': contentLength.toString(), + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'public, max-age=3600' + }) + + // Combine cached chunks and slice to exact range + const combinedData = chunks + .map(idx => cachedChunks.get(idx)) + .join('') + .slice( + start % RANGE_CACHE_CONFIG.CHUNK_SIZE, + (start % RANGE_CACHE_CONFIG.CHUNK_SIZE) + contentLength + ) + + return new Response(combinedData, { + status: 206, + headers + }) + } + + // Get full response and cache chunks + const response = await handler(request, env, ctx) + const buffer = await response.arrayBuffer() + + // Cache chunks in background + ctx.waitUntil( + Promise.all( + chunks.map(async (chunkIndex) => { + if (cachedChunks.has(chunkIndex)) return + + const chunkStart = chunkIndex * RANGE_CACHE_CONFIG.CHUNK_SIZE + const chunkEnd = Math.min(chunkStart + RANGE_CACHE_CONFIG.CHUNK_SIZE, buffer.byteLength) + const chunk = buffer.slice(chunkStart, chunkEnd) + + const cacheKey = getRangeCacheKey(resourceId, chunkIndex) + await env.RANGE_CACHE.put(cacheKey, chunk, { + expirationTtl: RANGE_CACHE_CONFIG.CHUNK_TTL + }) + }) + ) + ) + + // Return the range response + return new Response(buffer.slice(start, end + 1), { + status: 206, + headers: { + 'Content-Range': `bytes ${start}-${end}/${totalSize}`, + 'Content-Length': (end - start + 1).toString(), + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'public, max-age=3600' + } + }) + } catch (error) { + console.error('Range cache error:', error) + if (error instanceof HttpError) { + throw error + } + throw new HttpError('Internal Server Error', { status: 500 }) + } + } +} \ No newline at end of file