Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/tempo/client/SessionManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,48 @@ describe('Session', () => {
})
})

describe('.ws()', () => {
test('throws when no challenge received from HTTP endpoint', async () => {
const mockFetch = vi.fn().mockResolvedValue(makeOkResponse())

const s = sessionManager({
account: '0x0000000000000000000000000000000000000001',
fetch: mockFetch as typeof globalThis.fetch,
})

await expect(s.ws('ws://api.example.com/stream')).rejects.toThrow(
'No payment challenge received',
)
})

test('converts ws:// to http:// for the 402 handshake', async () => {
const mockFetch = vi.fn().mockResolvedValue(make402Response())

const s = sessionManager({
account: '0x0000000000000000000000000000000000000001',
fetch: mockFetch as typeof globalThis.fetch,
})

// Will throw because no maxDeposit — but we can verify the URL was converted
await expect(s.ws('ws://api.example.com/stream')).rejects.toThrow()
const calledUrl = mockFetch.mock.calls[0]?.[0]
expect(calledUrl).toContain('http://api.example.com/stream')
})

test('converts wss:// to https:// for the 402 handshake', async () => {
const mockFetch = vi.fn().mockResolvedValue(make402Response())

const s = sessionManager({
account: '0x0000000000000000000000000000000000000001',
fetch: mockFetch as typeof globalThis.fetch,
})

await expect(s.ws('wss://api.example.com/stream')).rejects.toThrow()
const calledUrl = mockFetch.mock.calls[0]?.[0]
expect(calledUrl).toContain('https://api.example.com/stream')
})
})

describe('.close()', () => {
test('is no-op when not opened', async () => {
const mockFetch = vi.fn()
Expand Down
118 changes: 118 additions & 0 deletions src/tempo/client/SessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ export type SessionManager = {
signal?: AbortSignal | undefined
},
): Promise<AsyncIterable<string>>
ws(
input: string | URL,
init?: {
protocols?: string | string[]
onReceipt?: ((receipt: SessionReceipt) => void) | undefined
signal?: AbortSignal | undefined
},
): Promise<WebSocket>
close(): Promise<SessionReceipt | undefined>
}

Expand Down Expand Up @@ -239,6 +247,116 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa
return iterate()
},

async ws(input, init) {
const { onReceipt, signal, protocols } = init ?? {}

// Convert ws:// → http:// for the 402 challenge/channel-open flow
const wsUrl = new URL(input.toString())
const httpUrl = new URL(wsUrl.toString())
httpUrl.protocol = wsUrl.protocol === 'wss:' ? 'https:' : 'http:'

// Trigger 402 → channel open via HTTP (reuses existing fetch flow)
const httpResponse = await doFetch(httpUrl.toString())
const wsChallenge = httpResponse.challenge

if (!wsChallenge) {
throw new Error(
'No payment challenge received from HTTP endpoint for this WebSocket URL. The server may not require payment or did not advertise a challenge.',
)
}

// Open WebSocket
const ws = new WebSocket(wsUrl.toString(), protocols)

await new Promise<void>((resolve, reject) => {
const onOpen = () => {
ws.removeEventListener('error', onError)
resolve()
}
const onError = (e: Event) => {
ws.removeEventListener('open', onOpen)
reject(e)
}
ws.addEventListener('open', onOpen, { once: true })
ws.addEventListener('error', onError, { once: true })
})

// Send initial credential as first message (in-band auth)
if (channel && wsChallenge) {
const credential = await method.createCredential({
challenge: wsChallenge as never,
context: {
action: 'voucher',
channelId: channel.channelId,
cumulativeAmountRaw: channel.cumulativeAmount.toString(),
},
})
ws.send(JSON.stringify({ mpp: 'credential', mppVersion: '1', authorization: credential }))
}

// Intercept payment messages (need-voucher, receipt)
// stopImmediatePropagation prevents MPP messages from reaching user listeners.
ws.addEventListener('message', async (event) => {
const raw = typeof event.data === 'string' ? event.data : undefined
if (!raw) return
try {
const msg = JSON.parse(raw)
if (!msg.mpp) return

event.stopImmediatePropagation()

if (msg.mpp === 'payment-need-voucher' && channel && wsChallenge) {
const required = BigInt(msg.data.requiredCumulative)
const accepted = BigInt(msg.data.acceptedCumulative ?? '0')
const deposit = BigInt(msg.data.deposit ?? '0')

// Pre-authorize a batch of chunks to reduce round-trips.
// On first need-voucher (accepted=0), use `required` as the unit price
// since the server tells us exactly what one unit costs.
// On subsequent vouchers, derive unit price from the delta.
const unitPrice = accepted === 0n
? (required > 0n ? required : 1n)
: (required > accepted ? required - accepted : 1n)
const batchTarget = required + unitPrice * 99n // ~100 chunks total
// Cap at deposit to avoid exceeding on-chain balance
const capped = deposit > 0n && batchTarget > deposit ? deposit : batchTarget
const newCumulative = capped > required ? capped : required

channel.cumulativeAmount =
channel.cumulativeAmount > newCumulative ? channel.cumulativeAmount : newCumulative

try {
const credential = await method.createCredential({
challenge: wsChallenge as never,
context: {
action: 'voucher',
channelId: channel.channelId,
cumulativeAmountRaw: channel.cumulativeAmount.toString(),
},
})
ws.send(
JSON.stringify({ mpp: 'voucher', mppVersion: '1', authorization: credential }),
)
} catch (err) {
console.error('[mppx] ws voucher creation failed:', err)
ws.close(1011, 'Failed to create payment credential')
}
} else if (msg.mpp === 'payment-receipt') {
updateSpentFromReceipt(msg.data)
onReceipt?.(msg.data)
}
} catch {
// Not JSON — not an MPP message, ignore
}
})

if (signal) {
signal.addEventListener('abort', () => ws.close(), { once: true })
}

return ws
},

async close() {
if (!channel?.opened || !lastChallenge) return undefined

Expand Down
Loading