diff --git a/src/tempo/client/SessionManager.test.ts b/src/tempo/client/SessionManager.test.ts index 92dece91..22c3278a 100644 --- a/src/tempo/client/SessionManager.test.ts +++ b/src/tempo/client/SessionManager.test.ts @@ -207,6 +207,48 @@ describe('Session', () => { }) }) + describe('.ws()', () => { + test('throws when no challenge received from HTTP endpoint', async () => { + const mockFetch = vi.fn().mockResolvedValue(makeOkResponse()) + + const s = sessionManager({ + account: '0x0000000000000000000000000000000000000001', + fetch: mockFetch as typeof globalThis.fetch, + }) + + await expect(s.ws('ws://api.example.com/stream')).rejects.toThrow( + 'No payment challenge received', + ) + }) + + test('converts ws:// to http:// for the 402 handshake', async () => { + const mockFetch = vi.fn().mockResolvedValue(make402Response()) + + const s = sessionManager({ + account: '0x0000000000000000000000000000000000000001', + fetch: mockFetch as typeof globalThis.fetch, + }) + + // Will throw because no maxDeposit — but we can verify the URL was converted + await expect(s.ws('ws://api.example.com/stream')).rejects.toThrow() + const calledUrl = mockFetch.mock.calls[0]?.[0] + expect(calledUrl).toContain('http://api.example.com/stream') + }) + + test('converts wss:// to https:// for the 402 handshake', async () => { + const mockFetch = vi.fn().mockResolvedValue(make402Response()) + + const s = sessionManager({ + account: '0x0000000000000000000000000000000000000001', + fetch: mockFetch as typeof globalThis.fetch, + }) + + await expect(s.ws('wss://api.example.com/stream')).rejects.toThrow() + const calledUrl = mockFetch.mock.calls[0]?.[0] + expect(calledUrl).toContain('https://api.example.com/stream') + }) + }) + describe('.close()', () => { test('is no-op when not opened', async () => { const mockFetch = vi.fn() diff --git a/src/tempo/client/SessionManager.ts b/src/tempo/client/SessionManager.ts index c0e33427..8e87f185 100644 --- a/src/tempo/client/SessionManager.ts +++ b/src/tempo/client/SessionManager.ts @@ -24,6 +24,14 @@ export type SessionManager = { signal?: AbortSignal | undefined }, ): Promise> + ws( + input: string | URL, + init?: { + protocols?: string | string[] + onReceipt?: ((receipt: SessionReceipt) => void) | undefined + signal?: AbortSignal | undefined + }, + ): Promise close(): Promise } @@ -239,6 +247,116 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa return iterate() }, + async ws(input, init) { + const { onReceipt, signal, protocols } = init ?? {} + + // Convert ws:// → http:// for the 402 challenge/channel-open flow + const wsUrl = new URL(input.toString()) + const httpUrl = new URL(wsUrl.toString()) + httpUrl.protocol = wsUrl.protocol === 'wss:' ? 'https:' : 'http:' + + // Trigger 402 → channel open via HTTP (reuses existing fetch flow) + const httpResponse = await doFetch(httpUrl.toString()) + const wsChallenge = httpResponse.challenge + + if (!wsChallenge) { + throw new Error( + 'No payment challenge received from HTTP endpoint for this WebSocket URL. The server may not require payment or did not advertise a challenge.', + ) + } + + // Open WebSocket + const ws = new WebSocket(wsUrl.toString(), protocols) + + await new Promise((resolve, reject) => { + const onOpen = () => { + ws.removeEventListener('error', onError) + resolve() + } + const onError = (e: Event) => { + ws.removeEventListener('open', onOpen) + reject(e) + } + ws.addEventListener('open', onOpen, { once: true }) + ws.addEventListener('error', onError, { once: true }) + }) + + // Send initial credential as first message (in-band auth) + if (channel && wsChallenge) { + const credential = await method.createCredential({ + challenge: wsChallenge as never, + context: { + action: 'voucher', + channelId: channel.channelId, + cumulativeAmountRaw: channel.cumulativeAmount.toString(), + }, + }) + ws.send(JSON.stringify({ mpp: 'credential', mppVersion: '1', authorization: credential })) + } + + // Intercept payment messages (need-voucher, receipt) + // stopImmediatePropagation prevents MPP messages from reaching user listeners. + ws.addEventListener('message', async (event) => { + const raw = typeof event.data === 'string' ? event.data : undefined + if (!raw) return + try { + const msg = JSON.parse(raw) + if (!msg.mpp) return + + event.stopImmediatePropagation() + + if (msg.mpp === 'payment-need-voucher' && channel && wsChallenge) { + const required = BigInt(msg.data.requiredCumulative) + const accepted = BigInt(msg.data.acceptedCumulative ?? '0') + const deposit = BigInt(msg.data.deposit ?? '0') + + // Pre-authorize a batch of chunks to reduce round-trips. + // On first need-voucher (accepted=0), use `required` as the unit price + // since the server tells us exactly what one unit costs. + // On subsequent vouchers, derive unit price from the delta. + const unitPrice = accepted === 0n + ? (required > 0n ? required : 1n) + : (required > accepted ? required - accepted : 1n) + const batchTarget = required + unitPrice * 99n // ~100 chunks total + // Cap at deposit to avoid exceeding on-chain balance + const capped = deposit > 0n && batchTarget > deposit ? deposit : batchTarget + const newCumulative = capped > required ? capped : required + + channel.cumulativeAmount = + channel.cumulativeAmount > newCumulative ? channel.cumulativeAmount : newCumulative + + try { + const credential = await method.createCredential({ + challenge: wsChallenge as never, + context: { + action: 'voucher', + channelId: channel.channelId, + cumulativeAmountRaw: channel.cumulativeAmount.toString(), + }, + }) + ws.send( + JSON.stringify({ mpp: 'voucher', mppVersion: '1', authorization: credential }), + ) + } catch (err) { + console.error('[mppx] ws voucher creation failed:', err) + ws.close(1011, 'Failed to create payment credential') + } + } else if (msg.mpp === 'payment-receipt') { + updateSpentFromReceipt(msg.data) + onReceipt?.(msg.data) + } + } catch { + // Not JSON — not an MPP message, ignore + } + }) + + if (signal) { + signal.addEventListener('abort', () => ws.close(), { once: true }) + } + + return ws + }, + async close() { if (!channel?.opened || !lastChallenge) return undefined