Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions convex/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ crons.interval(
{ batchSize: 200, maxBatches: 5 },
)

crons.interval(
'download-dedupe-prune',
{ hours: 24 },
internal.downloads.pruneDownloadDedupesInternal,
{},
)

export default crons
12 changes: 12 additions & 0 deletions convex/downloads.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { describe, expect, it } from 'vitest'
import { __test } from './downloads'

describe('downloads helpers', () => {
it('calculates day start boundaries', () => {
const day = 86_400_000
expect(__test.getDayStart(0)).toBe(0)
expect(__test.getDayStart(day - 1)).toBe(0)
expect(__test.getDayStart(day)).toBe(day)
expect(__test.getDayStart(day + 1)).toBe(day)
})
})
94 changes: 83 additions & 11 deletions convex/downloads.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { v } from 'convex/values'
import { zipSync } from 'fflate'
import { api } from './_generated/api'
import { httpAction, mutation } from './_generated/server'
import { api, internal } from './_generated/api'
import { httpAction, internalMutation } from './_generated/server'
import { applyRateLimit, getClientIp } from './lib/httpRateLimit'
import { applySkillStatDeltas, bumpDailySkillStats } from './lib/skillStats'
import { hashToken } from './lib/tokens'

const DAY_MS = 86_400_000
const DEDUPE_RETENTION_DAYS = 14

export const downloadZip = httpAction(async (ctx, request) => {
const url = new URL(request.url)
Expand All @@ -14,6 +19,9 @@ export const downloadZip = httpAction(async (ctx, request) => {
return new Response('Missing slug', { status: 400 })
}

const rate = await applyRateLimit(ctx, request, 'download')
if (!rate.ok) return rate.response

const skillResult = await ctx.runQuery(api.skills.getBySlug, { slug })
if (!skillResult?.skill) {
return new Response('Skill not found', { status: 404 })
Expand Down Expand Up @@ -53,29 +61,93 @@ export const downloadZip = httpAction(async (ctx, request) => {
const zipArray = Uint8Array.from(zipData)
const zipBlob = new Blob([zipArray], { type: 'application/zip' })

await ctx.runMutation(api.downloads.increment, { skillId: skill._id })
const ip = getClientIp(request) ?? 'unknown'
const ipHash = await hashToken(ip)
const dayStart = getDayStart(Date.now())
try {
await ctx.runMutation(internal.downloads.recordDownloadInternal, {
skillId: skill._id,
ipHash,
dayStart,
})
} catch {
// Ignore download count failures.
}

return new Response(zipBlob, {
status: 200,
headers: {
headers: mergeHeaders(rate.headers, {
'Content-Type': 'application/zip',
'Content-Disposition': `attachment; filename="${slug}-${version.version}.zip"`,
'Cache-Control': 'private, max-age=60',
},
}),
})
})

export const increment = mutation({
args: { skillId: v.id('skills') },
export const recordDownloadInternal = internalMutation({
args: {
skillId: v.id('skills'),
ipHash: v.string(),
dayStart: v.number(),
},
handler: async (ctx, args) => {
const skill = await ctx.db.get(args.skillId)
if (!skill) return

const existing = await ctx.db
.query('downloadDedupes')
.withIndex('by_skill_ip_day', (q) =>
q.eq('skillId', args.skillId).eq('ipHash', args.ipHash).eq('dayStart', args.dayStart),
)
.unique()
if (existing) return

const now = Date.now()
const patch = applySkillStatDeltas(skill, { downloads: 1 })
await ctx.db.patch(skill._id, {
...patch,
updatedAt: now,
await ctx.db.insert('downloadDedupes', {
skillId: args.skillId,
ipHash: args.ipHash,
dayStart: args.dayStart,
createdAt: now,
})

const patch = applySkillStatDeltas(skill, { downloads: 1 })
await ctx.db.patch(skill._id, { ...patch, updatedAt: now })
await bumpDailySkillStats(ctx, { skillId: skill._id, now, downloads: 1 })
},
})

export const pruneDownloadDedupesInternal = internalMutation({
args: {},
handler: async (ctx) => {
const cutoff = Date.now() - DEDUPE_RETENTION_DAYS * DAY_MS
let remaining = true
let batches = 0
while (remaining && batches < 10) {
const stale = await ctx.db
.query('downloadDedupes')
.withIndex('by_day')
.filter((q) => q.lt(q.field('dayStart'), cutoff))
.take(200)
if (stale.length === 0) {
remaining = false
break
}
for (const entry of stale) {
await ctx.db.delete(entry._id)
}
batches += 1
}
},
})

export function getDayStart(timestamp: number) {
return Math.floor(timestamp / DAY_MS) * DAY_MS
}

export const __test = {
getDayStart,
}

function mergeHeaders(base: HeadersInit, extra: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}
91 changes: 2 additions & 89 deletions convex/httpApiV1.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import { CliPublishRequestSchema, parseArk } from 'clawdhub-schema'
import { api, internal } from './_generated/api'
import type { Doc, Id } from './_generated/dataModel'
import type { ActionCtx } from './_generated/server'
import { httpAction } from './_generated/server'
import { type ActionCtx, httpAction } from './_generated/server'
import { requireApiTokenUser } from './lib/apiTokenAuth'
import { hashToken } from './lib/tokens'
import { applyRateLimit, parseBearerToken } from './lib/httpRateLimit'
import { publishVersionForUser } from './skills'
import { publishSoulVersionForUser } from './souls'

const RATE_LIMIT_WINDOW_MS = 60_000
const RATE_LIMITS = {
read: { ip: 120, key: 600 },
write: { ip: 30, key: 120 },
} as const
const MAX_RAW_FILE_BYTES = 200 * 1024

type SearchSkillEntry = {
Expand Down Expand Up @@ -631,87 +625,6 @@ async function resolveTags(
return resolved
}

async function applyRateLimit(
ctx: ActionCtx,
request: Request,
kind: 'read' | 'write',
): Promise<{ ok: true; headers: HeadersInit } | { ok: false; response: Response }> {
const ip = getClientIp(request) ?? 'unknown'
const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip)
const token = parseBearerToken(request)
const keyResult = token
? await checkRateLimit(ctx, `key:${await hashToken(token)}`, RATE_LIMITS[kind].key)
: null

const chosen = pickMostRestrictive(ipResult, keyResult)
const headers = rateHeaders(chosen)

if (!ipResult.allowed || (keyResult && !keyResult.allowed)) {
return {
ok: false,
response: text('Rate limit exceeded', 429, headers),
}
}

return { ok: true, headers }
}

type RateLimitResult = {
allowed: boolean
remaining: number
limit: number
resetAt: number
}

async function checkRateLimit(
ctx: ActionCtx,
key: string,
limit: number,
): Promise<RateLimitResult> {
return (await ctx.runMutation(internal.rateLimits.checkRateLimitInternal, {
key,
limit,
windowMs: RATE_LIMIT_WINDOW_MS,
})) as RateLimitResult
}

function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) {
if (!secondary) return primary
if (!primary.allowed) return primary
if (!secondary.allowed) return secondary
return secondary.remaining < primary.remaining ? secondary : primary
}

function rateHeaders(result: RateLimitResult): HeadersInit {
const resetSeconds = Math.ceil(result.resetAt / 1000)
return {
'X-RateLimit-Limit': String(result.limit),
'X-RateLimit-Remaining': String(result.remaining),
'X-RateLimit-Reset': String(resetSeconds),
...(result.allowed ? {} : { 'Retry-After': String(resetSeconds) }),
}
}

function getClientIp(request: Request) {
const header =
request.headers.get('cf-connecting-ip') ??
request.headers.get('x-real-ip') ??
request.headers.get('x-forwarded-for') ??
request.headers.get('fly-client-ip')
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
return header.trim()
}

function parseBearerToken(request: Request) {
const header = request.headers.get('authorization') ?? request.headers.get('Authorization')
if (!header) return null
const trimmed = header.trim()
if (!trimmed.toLowerCase().startsWith('bearer ')) return null
const token = trimmed.slice(7).trim()
return token || null
}

function json(value: unknown, status = 200, headers?: HeadersInit) {
return new Response(JSON.stringify(value), {
status,
Expand Down
35 changes: 35 additions & 0 deletions convex/lib/httpRateLimit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* @vitest-environment node */
import { describe, expect, it } from 'vitest'
import { getClientIp } from './httpRateLimit'

describe('getClientIp', () => {
it('returns null when cf-connecting-ip missing', () => {
const request = new Request('https://example.com', {
headers: {
'x-forwarded-for': '203.0.113.9',
},
})
process.env.TRUST_FORWARDED_IPS = ''
expect(getClientIp(request)).toBeNull()
})

it('returns first ip from cf-connecting-ip', () => {
const request = new Request('https://example.com', {
headers: {
'cf-connecting-ip': '203.0.113.1, 198.51.100.2',
},
})
expect(getClientIp(request)).toBe('203.0.113.1')
})

it('uses forwarded headers when opt-in enabled', () => {
const request = new Request('https://example.com', {
headers: {
'x-forwarded-for': '203.0.113.9, 198.51.100.2',
},
})
process.env.TRUST_FORWARDED_IPS = 'true'
expect(getClientIp(request)).toBe('203.0.113.9')
process.env.TRUST_FORWARDED_IPS = ''
})
})
Loading