Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import {
withEgressTracker,
withAuthorizedSpace,
withLocator,
withDelegationStubs
withDelegationStubs,
withMemoryBudget,
withRangeCache
} from './middleware/index.js'
import { instrument } from '@microlabs/otel-cf-workers'
import { NoopSpanProcessor } from '@opentelemetry/sdk-trace-base'
Expand All @@ -47,32 +49,34 @@ const handler = {
fetch (request, env, ctx) {
console.log(request.method, request.url)
const middleware = composeMiddleware(
// Prepare the Context
withCdnCache,
// Basic context and error handling
withContext,
withErrorHandler,
withCorsHeaders,
withVersionHeader,
withErrorHandler,

// Security and resource management
withMemoryBudget, // Add memory budgeting early
withRateLimit, // Rate limiting before expensive operations
withAuthToken, // Authentication
withAuthorizedSpace, // Authorization

// Request parsing and validation
withParsedIpfsUrl,
createWithHttpMethod('GET', 'HEAD'),
withAuthToken,

// Caching and performance
withCdnCache,
withRangeCache, // Range request handling

// Data fetching and processing
withLocator,
withDelegationStubs,

// Rate-limit requests
withRateLimit,

// Track egress bytes
withEgressTracker,

// Fetch data
withCarBlockHandler,
withAuthorizedSpace,
withContentClaimsDagula,
withFormatRawHandler,
withFormatCarHandler,

// Prepare the Response
withEgressTracker,

// Response preparation
withContentDispositionHeader,
withFixedLengthStream
)
Expand Down
8 changes: 5 additions & 3 deletions src/middleware/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
export { withContentClaimsDagula } from './withContentClaimsDagula.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withAuthToken } from './withAuthToken.js'
export { withCarBlockHandler } from './withCarBlockHandler.js'
export { withContentClaimsDagula } from './withContentClaimsDagula.js'
export { withRateLimit } from './withRateLimit.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withEgressTracker } from './withEgressTracker.js'
export { withAuthorizedSpace } from './withAuthorizedSpace.js'
export { withLocator } from './withLocator.js'
export { withEgressTracker } from './withEgressTracker.js'
export { withDelegationStubs } from './withDelegationStubs.js'
export { withMemoryBudget } from './withMemoryBudget.js'
export { withRangeCache } from './withRangeCache.js'
135 changes: 111 additions & 24 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Dagula } from 'dagula'
import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
import { HttpError } from 'http-errors'
import { ContentClaimsLocator } from '@web3-storage/gateway-lib'

/**
* @import {
Expand All @@ -13,35 +15,120 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
* @import { Environment } from './withContentClaimsDagula.types.js'
*/

/**
* Error categories for content claims
*/
const ERROR_TYPES = {
NOT_FOUND: 'NotFound',
UNAUTHORIZED: 'Unauthorized',
RATE_LIMITED: 'RateLimited',
VALIDATION_ERROR: 'ValidationError',
INTERNAL_ERROR: 'InternalError'
}

/**
* Sanitizes error messages for external consumption
* @param {Error} error - The original error
* @returns {string} - Sanitized error message
*/
function sanitizeErrorMessage(error) {
// Remove any internal paths or sensitive info
return error.message.replace(/\/internal\/.*\//, '[path]/')
.replace(/key=[\w-]+/, 'key=[redacted]')
}

/**
* Creates a dagula instance backed by content claims.
*
* @type {(
* Middleware<
* BlockContext & DagContext & UnixfsContext & IpfsUrlContext & LocatorContext,
* IpfsUrlContext & LocatorContext,
* Environment
* >
* )}
* @type {import('@web3-storage/gateway-lib').Middleware<BlockContext & DagContext & UnixfsContext & IpfsUrlContext, IpfsUrlContext, Environment>}
*/
export function withContentClaimsDagula (handler) {
export function withContentClaimsDagula(handler) {
return async (request, env, ctx) => {
const { locator } = ctx
const fetcher = BatchingFetcher.create(locator)
const dagula = new Dagula({
async get (cid) {
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
},
async stream (cid, options) {
const res = await fetcher.fetch(cid.multihash, options)
return res.ok ? res.ok.stream() : undefined
},
async stat (cid) {
const res = await locator.locate(cid.multihash)
return res.ok ? { size: res.ok.site[0].range.length } : undefined
const { dataCid } = ctx

try {
const locator = ContentClaimsLocator.create({
serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined,
retryOptions: {
retries: 3,
minTimeout: 1000,
maxTimeout: 5000
}
})

const locRes = await locator.locate(dataCid.multihash)

if (locRes.error) {
const errorType = locRes.error.name || ERROR_TYPES.INTERNAL_ERROR
console.error(`Content claims error: ${errorType}`, {
cid: dataCid.toString(),
error: locRes.error
})

switch (errorType) {
case ERROR_TYPES.NOT_FOUND:
throw new HttpError('Not Found', { status: 404 })
case ERROR_TYPES.UNAUTHORIZED:
throw new HttpError('Unauthorized', { status: 401 })
case ERROR_TYPES.RATE_LIMITED:
throw new HttpError('Too Many Requests', { status: 429 })
case ERROR_TYPES.VALIDATION_ERROR:
throw new HttpError('Bad Request', { status: 400 })
default:
// Log the full error internally but return a sanitized message
console.error('Internal content claims error:', locRes.error)
throw new HttpError('Internal Server Error', { status: 500 })
}
}
})
return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula })

const fetcher = BatchingFetcher.create(locator)
const dagula = new Dagula({
async get(cid) {
try {
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
} catch (error) {
console.error('Block fetch error:', {
cid: cid.toString(),
error: sanitizeErrorMessage(error)
})
throw new HttpError('Failed to fetch block', { status: 502 })
}
},
async stream(cid, options) {
try {
const res = await fetcher.fetch(cid.multihash, options)
return res.ok ? res.ok.stream() : undefined
} catch (error) {
console.error('Stream error:', {
cid: cid.toString(),
error: sanitizeErrorMessage(error)
})
throw new HttpError('Failed to stream content', { status: 502 })
}
},
async stat(cid) {
try {
const res = await locator.locate(cid.multihash)
return res.ok ? { size: res.ok.site[0].range.length } : undefined
} catch (error) {
console.error('Stat error:', {
cid: cid.toString(),
error: sanitizeErrorMessage(error)
})
throw new HttpError('Failed to get content stats', { status: 502 })
}
}
})

return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula })
} catch (error) {
// Catch any unhandled errors
console.error('Unhandled content claims error:', {
cid: dataCid.toString(),
error: sanitizeErrorMessage(error)
})
throw new HttpError('Service Unavailable', { status: 503 })
}
}
}
139 changes: 139 additions & 0 deletions src/middleware/withMemoryBudget.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { HttpError } from '@web3-storage/gateway-lib/util'

/**
* Default memory limits in bytes
*/
const DEFAULT_MEMORY_LIMITS = {
MAX_BLOCK_SIZE: 1024 * 1024 * 10, // 10MB max block size
MAX_BATCH_SIZE: 1024 * 1024 * 50, // 50MB max batch size
MAX_CONCURRENT_BLOCKS: 100, // Maximum number of blocks to process concurrently
MEMORY_THRESHOLD: 0.8 // 80% memory threshold before throttling
}

/**
* Memory budget tracking for block operations
*/
class MemoryBudget {
constructor(limits = DEFAULT_MEMORY_LIMITS) {
this.limits = limits
this.currentMemoryUsage = 0
this.activeBlocks = new Set()
}

/**
* Check if operation would exceed memory budget
* @param {number} size - Size of the operation in bytes
* @returns {boolean}
*/
wouldExceedBudget(size) {
return (
size > this.limits.MAX_BLOCK_SIZE ||
this.currentMemoryUsage + size > this.limits.MAX_BATCH_SIZE ||
this.activeBlocks.size >= this.limits.MAX_CONCURRENT_BLOCKS
)
}

/**
* Track memory usage for a block operation
* @param {string} blockId - Unique identifier for the block
* @param {number} size - Size in bytes
*/
trackBlock(blockId, size) {
if (this.wouldExceedBudget(size)) {
throw new HttpError('Memory budget exceeded', { status: 413 })
}
this.currentMemoryUsage += size
this.activeBlocks.add(blockId)
}

/**
* Release memory for a block operation
* @param {string} blockId - Block identifier to release
* @param {number} size - Size in bytes to release
*/
releaseBlock(blockId, size) {
this.currentMemoryUsage = Math.max(0, this.currentMemoryUsage - size)
this.activeBlocks.delete(blockId)
}

/**
* Get current memory usage statistics
* @returns {Object} Memory usage stats
*/
getStats() {
return {
currentUsage: this.currentMemoryUsage,
activeBlocks: this.activeBlocks.size,
isThrottled: this.currentMemoryUsage > (this.limits.MAX_BATCH_SIZE * this.limits.MEMORY_THRESHOLD)
}
}
}

/**
* Middleware to manage memory budget for block operations
* @type {import('@web3-storage/gateway-lib').Middleware}
*/
export function withMemoryBudget(handler) {
return async (request, env, ctx) => {
const memoryBudget = new MemoryBudget()

// Wrap the context with memory budgeting
const budgetedCtx = {
...ctx,
memoryBudget,
blocks: {
...ctx.blocks,
get: async (cid) => {
const blockId = cid.toString()
try {
const block = await ctx.blocks.get(cid)
if (block) {
memoryBudget.trackBlock(blockId, block.bytes.length)
}
return block
} catch (error) {
console.error('Block fetch error:', { blockId, error })
throw error
}
},
stream: async (cid, options) => {
const blockId = cid.toString()
const stream = await ctx.blocks.stream(cid, options)

if (!stream) return stream

// Wrap the stream to track memory usage
return new TransformStream({
start(controller) {
memoryBudget.trackBlock(blockId, 0)
},
transform(chunk, controller) {
memoryBudget.trackBlock(blockId, chunk.length)
controller.enqueue(chunk)
},
flush(controller) {
memoryBudget.releaseBlock(blockId)
}
})
}
}
}

try {
const response = await handler(request, env, budgetedCtx)

// Add memory usage stats to response headers
const stats = memoryBudget.getStats()
response.headers.set('X-Memory-Usage', stats.currentUsage.toString())
response.headers.set('X-Active-Blocks', stats.activeBlocks.toString())

return response
} catch (error) {
if (error instanceof HttpError && error.status === 413) {
// Memory budget exceeded
console.warn('Memory budget exceeded:', memoryBudget.getStats())
}
throw error
}
}
}
Loading