diff --git a/packages/ibkr/src/connection.ts b/packages/ibkr/src/connection.ts index db40ee21..789ad791 100644 --- a/packages/ibkr/src/connection.ts +++ b/packages/ibkr/src/connection.ts @@ -47,14 +47,25 @@ export class Connection extends EventEmitter { }) this.socket.on('close', () => { + // Guard: if socket is already null, disconnect() already handled cleanup. + // Without this check, connectionClosed() would be called twice when + // disconnect() is invoked (once by disconnect, once by the close event). + if (this.socket === null) return this.socket = null if (this.wrapper) { this.wrapper.connectionClosed() } }) - this.socket.on('error', (err: Error) => { - this.emit('error', err) + // Python equivalent: recvMsg() catches socket.error → calls self.disconnect(). + // We do the same — disconnect the socket so the 'close' path or the direct + // disconnect() call triggers wrapper.connectionClosed(), which lets upper + // layers (UTA health tracking) handle the failure gracefully. + // + // DO NOT emit('error') here — no listener exists in the call chain, and + // Node's EventEmitter crashes the process on unhandled 'error' events. + this.socket.on('error', () => { + this.disconnect() }) this.socket.connect(this.port, this.host, () => { @@ -72,9 +83,10 @@ export class Connection extends EventEmitter { disconnect(): void { if (this.socket !== null) { - this.socket.destroy() - this.socket = null - if (this.wrapper) { + const s = this.socket + this.socket = null // Set null BEFORE destroy — the 'close' event + s.destroy() // handler checks this to avoid double-calling + if (this.wrapper) { // wrapper.connectionClosed(). this.wrapper.connectionClosed() } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 31de3fbe..60ec16a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -225,6 +225,9 @@ importers: react-router-dom: specifier: ^7.13.1 version: 7.13.1(react-dom@19.2.4(react@19.2.4))(react@19.2.4) + recharts: + specifier: ^3.8.0 + version: 3.8.0(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react-is@17.0.2)(react@19.2.4)(redux@5.0.1) devDependencies: '@tailwindcss/vite': specifier: ^4.1.8 @@ -1003,6 +1006,17 @@ packages: '@protobufjs/utf8@1.1.0': resolution: {integrity: sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==} + '@reduxjs/toolkit@2.11.2': + resolution: {integrity: sha512-Kd6kAHTA6/nUpp8mySPqj3en3dm0tdMIgbttnQ1xFMVpufoj+ADi8pXLBsd4xzTRHQa7t/Jv8W5UnCuW4kuWMQ==} + peerDependencies: + react: ^16.9.0 || ^17.0.0 || ^18 || ^19 + react-redux: ^7.2.1 || ^8.1.3 || ^9.0.0 + peerDependenciesMeta: + react: + optional: true + react-redux: + optional: true + '@rolldown/pluginutils@1.0.0-beta.27': resolution: {integrity: sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA==} @@ -1150,6 +1164,9 @@ packages: '@standard-schema/spec@1.1.0': resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==} + '@standard-schema/utils@0.3.0': + resolution: {integrity: sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g==} + '@tailwindcss/node@4.2.1': resolution: {integrity: sha512-jlx6sLk4EOwO6hHe1oCGm1Q4AN/s0rSrTTPBGPM0/RQ6Uylwq17FuU8IeJJKEjtc6K6O07zsvP+gDO6MMWo7pg==} @@ -1300,6 +1317,33 @@ packages: '@types/connect@3.4.38': resolution: {integrity: sha512-K6uROf1LD88uDQqJCktA4yzL1YYAK6NgfsI0v/mTgyPKWsX1CnJ0XPSDhViejru1GcRkLWb8RlzFYJRqGUbaug==} + '@types/d3-array@3.2.2': + resolution: {integrity: sha512-hOLWVbm7uRza0BYXpIIW5pxfrKe0W+D5lrFiAEYR+pb6w3N2SwSMaJbXdUfSEv+dT4MfHBLtn5js0LAWaO6otw==} + + '@types/d3-color@3.1.3': + resolution: {integrity: sha512-iO90scth9WAbmgv7ogoq57O9YpKmFBbmoEoCHDB2xMBY0+/KVrqAaCDyCE16dUspeOvIxFFRI+0sEtqDqy2b4A==} + + '@types/d3-ease@3.0.2': + resolution: {integrity: sha512-NcV1JjO5oDzoK26oMzbILE6HW7uVXOHLQvHshBUW4UMdZGfiY6v5BeQwh9a9tCzv+CeefZQHJt5SRgK154RtiA==} + + '@types/d3-interpolate@3.0.4': + resolution: {integrity: sha512-mgLPETlrpVV1YRJIglr4Ez47g7Yxjl1lj7YKsiMCb27VJH9W8NVM6Bb9d8kkpG/uAQS5AmbA48q2IAolKKo1MA==} + + '@types/d3-path@3.1.1': + resolution: {integrity: sha512-VMZBYyQvbGmWyWVea0EHs/BwLgxc+MKi1zLDCONksozI4YJMcTt8ZEuIR4Sb1MMTE8MMW49v0IwI5+b7RmfWlg==} + + '@types/d3-scale@4.0.9': + resolution: {integrity: sha512-dLmtwB8zkAeO/juAMfnV+sItKjlsw2lKdZVVy6LRr0cBmegxSABiLEpGVmSJJ8O08i4+sGR6qQtb6WtuwJdvVw==} + + '@types/d3-shape@3.1.8': + resolution: {integrity: sha512-lae0iWfcDeR7qt7rA88BNiqdvPS5pFVPpo5OfjElwNaT2yyekbM0C9vK+yqBqEmHr6lDkRnYNoTBYlAgJa7a4w==} + + '@types/d3-time@3.0.4': + resolution: {integrity: sha512-yuzZug1nkAAaBlBBikKZTgzCeA+k1uy4ZFwWANOfKw5z5LRhV0gNA7gNkKm7HoK+HRN0wX3EkxGk0fpbWhmB7g==} + + '@types/d3-timer@3.0.2': + resolution: {integrity: sha512-Ps3T8E8dZDam6fUyNiMkekK3XUsaUEik+idO9/YjPtfj2qruF8tFBXS7XhtE4iIXBLxhmLjP3SXpLhVf21I9Lw==} + '@types/deep-eql@4.0.2': resolution: {integrity: sha512-c9h9dVVMigMPc4bwTvC5dxqtqJZwQPePsWjPlpSOnojbor6pGqdk541lfA7AqFQr5pB1BRdq0juY9db81BwyFw==} @@ -1344,6 +1388,9 @@ packages: '@types/trusted-types@2.0.7': resolution: {integrity: sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==} + '@types/use-sync-external-store@0.0.6': + resolution: {integrity: sha512-zFDAD+tlpf2r4asuHEj0XH6pY6i0g5NeAHPn+15wk3BV6JA69eERFXC1gyGThDkVa1zCyKr5jox1+2LbV/AMLg==} + '@types/ws@8.18.1': resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} @@ -1564,6 +1611,10 @@ packages: resolution: {integrity: sha512-Qgzu8kfBvo+cA4962jnP1KkS6Dop5NS6g7R5LFYJr4b8Ub94PPQXUksCw9PvXoeXPRRddRNC5C1JQUR2SMGtnA==} engines: {node: '>= 14.16.0'} + clsx@2.1.1: + resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} + engines: {node: '>=6'} + combined-stream@1.0.8: resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} engines: {node: '>= 0.8'} @@ -1617,6 +1668,50 @@ packages: csstype@3.2.3: resolution: {integrity: sha512-z1HGKcYy2xA8AGQfwrn0PAy+PB7X/GSj3UVJW9qKyn43xWa+gl5nXmU4qqLMRzWVLFC8KusUX8T/0kCiOYpAIQ==} + d3-array@3.2.4: + resolution: {integrity: sha512-tdQAmyA18i4J7wprpYq8ClcxZy3SC31QMeByyCFyRt7BVHdREQZ5lpzoe5mFEYZUWe+oq8HBvk9JjpibyEV4Jg==} + engines: {node: '>=12'} + + d3-color@3.1.0: + resolution: {integrity: sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==} + engines: {node: '>=12'} + + d3-ease@3.0.1: + resolution: {integrity: sha512-wR/XK3D3XcLIZwpbvQwQ5fK+8Ykds1ip7A2Txe0yxncXSdq1L9skcG7blcedkOX+ZcgxGAmLX1FrRGbADwzi0w==} + engines: {node: '>=12'} + + d3-format@3.1.2: + resolution: {integrity: sha512-AJDdYOdnyRDV5b6ArilzCPPwc1ejkHcoyFarqlPqT7zRYjhavcT3uSrqcMvsgh2CgoPbK3RCwyHaVyxYcP2Arg==} + engines: {node: '>=12'} + + d3-interpolate@3.0.1: + resolution: {integrity: sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==} + engines: {node: '>=12'} + + d3-path@3.1.0: + resolution: {integrity: sha512-p3KP5HCf/bvjBSSKuXid6Zqijx7wIfNW+J/maPs+iwR35at5JCbLUT0LzF1cnjbCHWhqzQTIN2Jpe8pRebIEFQ==} + engines: {node: '>=12'} + + d3-scale@4.0.2: + resolution: {integrity: sha512-GZW464g1SH7ag3Y7hXjf8RoUuAFIqklOAq3MRl4OaWabTFJY9PN/E1YklhXLh+OQ3fM9yS2nOkCoS+WLZ6kvxQ==} + engines: {node: '>=12'} + + d3-shape@3.2.0: + resolution: {integrity: sha512-SaLBuwGm3MOViRq2ABk3eLoxwZELpH6zhl3FbAoJ7Vm1gofKx6El1Ib5z23NUEhF9AsGl7y+dzLe5Cw2AArGTA==} + engines: {node: '>=12'} + + d3-time-format@4.1.0: + resolution: {integrity: sha512-dJxPBlzC7NugB2PDLwo9Q8JiTR3M3e4/XANkreKSUxF8vvXKqm1Yfq4Q5dl8budlunRVlUUaDUgFt7eA8D6NLg==} + engines: {node: '>=12'} + + d3-time@3.1.0: + resolution: {integrity: sha512-VqKjzBLejbSMT4IgbmVgDjpkYrNWUYJnbCGo874u7MMKIWsILRX+OpX/gTk8MqjpT1A/c6HY2dCA77ZN0lkQ2Q==} + engines: {node: '>=12'} + + d3-timer@3.0.1: + resolution: {integrity: sha512-ndfJ/JxxMd3nw31uyKoY2naivF+r29V+Lc0svZxe1JvvIRmi8hUsrMvdOwgS1o6uBHmiz91geQ0ylPP0aj1VUA==} + engines: {node: '>=12'} + data-urls@7.0.0: resolution: {integrity: sha512-23XHcCF+coGYevirZceTVD7NdJOqVn+49IHyxgszm+JIiHLoB2TkmPtsYkNWT1pvRSGkc35L6NHs0yHkN2SumA==} engines: {node: ^20.19.0 || ^22.12.0 || >=24.0.0} @@ -1630,6 +1725,9 @@ packages: supports-color: optional: true + decimal.js-light@2.5.1: + resolution: {integrity: sha512-qIMFpTMZmny+MMIitAB6D7iVPEorVw6YQRWkvarTkT4tBeSLLiHzcwj6q0MmYSFCiVpiqPJTJEYIrpcPzVEIvg==} + decimal.js@10.6.0: resolution: {integrity: sha512-YpgQiITW3JXGntzdUmyUR1V812Hn8T1YVXhCu+wO3OpS4eU9l4YdD3qjyiKdV6mvV29zapkMeD390UVEf2lkUg==} @@ -1712,6 +1810,9 @@ packages: resolution: {integrity: sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==} engines: {node: '>= 0.4'} + es-toolkit@1.45.1: + resolution: {integrity: sha512-/jhoOj/Fx+A+IIyDNOvO3TItGmlMKhtX8ISAHKE90c4b/k1tqaqEZ+uUqfpU8DMnW5cgNJv606zS55jGvza0Xw==} + esbuild@0.25.12: resolution: {integrity: sha512-bbPBYYrtZbkt6Os6FiTLCTFxvq4tt3JKall1vRwshA3fdVztsLAatFaZobhkBC8/BrPetoa0oksYoKXoG4ryJg==} engines: {node: '>=18'} @@ -1744,6 +1845,9 @@ packages: resolution: {integrity: sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==} engines: {node: '>=6'} + eventemitter3@5.0.4: + resolution: {integrity: sha512-mlsTRyGaPBjPedk6Bvw+aqbsXDtoAyAzm5MO7JgU+yVRyMQ5O8bD4Kcci7BS85f93veegeCPkL8R4GLClnjLFw==} + events@3.3.0: resolution: {integrity: sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==} engines: {node: '>=0.8.x'} @@ -1915,9 +2019,19 @@ packages: ieee754@1.2.1: resolution: {integrity: sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==} + immer@10.2.0: + resolution: {integrity: sha512-d/+XTN3zfODyjr89gM3mPq1WNX2B8pYsu7eORitdwyA2sBubnTl3laYlBk4sXY5FUa5qTZGBDPJICVbvqzjlbw==} + + immer@11.1.4: + resolution: {integrity: sha512-XREFCPo6ksxVzP4E0ekD5aMdf8WMwmdNaz6vuvxgI40UaEiu6q3p8X52aU6GdyvLY3XXX/8R7JOTXStz/nBbRw==} + inherits@2.0.4: resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==} + internmap@2.0.3: + resolution: {integrity: sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==} + engines: {node: '>=12'} + ip-address@10.0.1: resolution: {integrity: sha512-NWv9YLW4PoW2B7xtzaS3NCot75m6nK7Icdv0o3lfMceJVRfSoQwqD4wEH5rLwoKJwUiZ/rfpiVBhnaF0FK4HoA==} engines: {node: '>= 12'} @@ -2356,6 +2470,18 @@ packages: react-is@17.0.2: resolution: {integrity: sha512-w2GsyukL62IJnlaff/nRegPQR94C/XXamvMWmSHRJ4y7Ts/4ocGRmTHvOs8PSE6pB3dWOrD/nueuU5sduBsQ4w==} + react-redux@9.2.0: + resolution: {integrity: sha512-ROY9fvHhwOD9ySfrF0wmvu//bKCQ6AeZZq1nJNtbDC+kk5DuSuNX/n6YWYF/SYy7bSba4D4FSz8DJeKY/S/r+g==} + peerDependencies: + '@types/react': ^18.2.25 || ^19 + react: ^18.0 || ^19 + redux: ^5.0.0 + peerDependenciesMeta: + '@types/react': + optional: true + redux: + optional: true + react-refresh@0.17.0: resolution: {integrity: sha512-z6F7K9bV85EfseRCp2bzrpyQ0Gkw1uLoCel9XBVWPg/TjRj94SkJzUTGfOa4bs7iJvBWtQG0Wq7wnI0syw3EBQ==} engines: {node: '>=0.10.0'} @@ -2393,6 +2519,22 @@ packages: resolution: {integrity: sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==} engines: {node: '>= 12.13.0'} + recharts@3.8.0: + resolution: {integrity: sha512-Z/m38DX3L73ExO4Tpc9/iZWHmHnlzWG4njQbxsF5aSjwqmHNDDIm0rdEBArkwsBvR8U6EirlEHiQNYWCVh9sGQ==} + engines: {node: '>=18'} + peerDependencies: + react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + react-dom: ^16.0.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + react-is: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + + redux-thunk@3.1.0: + resolution: {integrity: sha512-NW2r5T6ksUKXCabzhL9z+h206HQw/NJkcLm1GPImRQ8IzfXwRGqjVhKJGauHirT0DAuyy6hjdnMZaRoAcy0Klw==} + peerDependencies: + redux: ^5.0.0 + + redux@5.0.1: + resolution: {integrity: sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==} + require-from-string@2.0.2: resolution: {integrity: sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==} engines: {node: '>=0.10.0'} @@ -2400,6 +2542,9 @@ packages: requires-port@1.0.0: resolution: {integrity: sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==} + reselect@5.1.1: + resolution: {integrity: sha512-K/BG6eIky/SBpzfHZv/dd+9JBFiS4SWV7FIujVyJRux6e45+73RaUHXLmIR1f7WOMaQ0U1km6qwklRQxpJJY0w==} + resolve-from@5.0.0: resolution: {integrity: sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==} engines: {node: '>=8'} @@ -2556,6 +2701,9 @@ packages: resolution: {integrity: sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA==} engines: {node: '>=20'} + tiny-invariant@1.3.3: + resolution: {integrity: sha512-+FbBPE1o9QAYvviau/qC5SE3caw21q3xkvWKBtja5vgqOWIHHJ3ioaq1VPfn/Szqctz2bU/oYeKd9/z5BL+PVg==} + tinybench@2.9.0: resolution: {integrity: sha512-0+DUvqWMValLmha6lr4kD8iAMK1HzV0/aKnCtWb9v9641TnP/MFb7Pc2bxoxQjTXAErryXVgUOfv2YqNllqGeg==} @@ -2735,6 +2883,11 @@ packages: urljoin@0.1.5: resolution: {integrity: sha512-OSGi+PS3zxk8XfQ+7buaupOdrW9P9p+V9rjxGzJaYEYDe/B2rv3WJCupq5LNERW4w4kWxsduUUrhCxZZiQ2udw==} + use-sync-external-store@1.6.0: + resolution: {integrity: sha512-Pp6GSwGP/NrPIrxVFAIkOQeyw8lFenOHijQWkUTrDvrF4ALqylP2C/KCkeS9dpUM3KvYRQhna5vt7IL95+ZQ9w==} + peerDependencies: + react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + util-deprecate@1.0.2: resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} @@ -2742,6 +2895,9 @@ packages: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} + victory-vendor@37.3.6: + resolution: {integrity: sha512-SbPDPdDBYp+5MJHhBCAyI7wKM3d5ivekigc2Dk2s7pgbZ9wIgIBYGVw4zGHBml/qTFbexrofXW6Gu4noGxrOwQ==} + vite-node@3.2.4: resolution: {integrity: sha512-EbKSKh+bh1E1IFxeO0pg1n4dvoOTt0UDiXMd/qn++r98+jPO1xtJilvXldeuQ8giIB5IkpjCgMleHMNEsGH6pg==} engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} @@ -3561,6 +3717,18 @@ snapshots: '@protobufjs/utf8@1.1.0': {} + '@reduxjs/toolkit@2.11.2(react-redux@9.2.0(@types/react@19.2.14)(react@19.2.4)(redux@5.0.1))(react@19.2.4)': + dependencies: + '@standard-schema/spec': 1.1.0 + '@standard-schema/utils': 0.3.0 + immer: 11.1.4 + redux: 5.0.1 + redux-thunk: 3.1.0(redux@5.0.1) + reselect: 5.1.1 + optionalDependencies: + react: 19.2.4 + react-redux: 9.2.0(@types/react@19.2.14)(react@19.2.4)(redux@5.0.1) + '@rolldown/pluginutils@1.0.0-beta.27': {} '@rollup/rollup-android-arm-eabi@4.57.1': @@ -3642,6 +3810,8 @@ snapshots: '@standard-schema/spec@1.1.0': {} + '@standard-schema/utils@0.3.0': {} + '@tailwindcss/node@4.2.1': dependencies: '@jridgewell/remapping': 2.3.5 @@ -3781,6 +3951,30 @@ snapshots: dependencies: '@types/node': 25.2.3 + '@types/d3-array@3.2.2': {} + + '@types/d3-color@3.1.3': {} + + '@types/d3-ease@3.0.2': {} + + '@types/d3-interpolate@3.0.4': + dependencies: + '@types/d3-color': 3.1.3 + + '@types/d3-path@3.1.1': {} + + '@types/d3-scale@4.0.9': + dependencies: + '@types/d3-time': 3.0.4 + + '@types/d3-shape@3.1.8': + dependencies: + '@types/d3-path': 3.1.1 + + '@types/d3-time@3.0.4': {} + + '@types/d3-timer@3.0.2': {} + '@types/deep-eql@4.0.2': {} '@types/estree@1.0.8': {} @@ -3832,6 +4026,8 @@ snapshots: '@types/trusted-types@2.0.7': optional: true + '@types/use-sync-external-store@0.0.6': {} + '@types/ws@8.18.1': dependencies: '@types/node': 25.2.3 @@ -4094,6 +4290,8 @@ snapshots: dependencies: readdirp: 4.1.2 + clsx@2.1.1: {} + combined-stream@1.0.8: dependencies: delayed-stream: 1.0.0 @@ -4134,6 +4332,44 @@ snapshots: csstype@3.2.3: {} + d3-array@3.2.4: + dependencies: + internmap: 2.0.3 + + d3-color@3.1.0: {} + + d3-ease@3.0.1: {} + + d3-format@3.1.2: {} + + d3-interpolate@3.0.1: + dependencies: + d3-color: 3.1.0 + + d3-path@3.1.0: {} + + d3-scale@4.0.2: + dependencies: + d3-array: 3.2.4 + d3-format: 3.1.2 + d3-interpolate: 3.0.1 + d3-time: 3.1.0 + d3-time-format: 4.1.0 + + d3-shape@3.2.0: + dependencies: + d3-path: 3.1.0 + + d3-time-format@4.1.0: + dependencies: + d3-time: 3.1.0 + + d3-time@3.1.0: + dependencies: + d3-array: 3.2.4 + + d3-timer@3.0.1: {} + data-urls@7.0.0: dependencies: whatwg-mimetype: 5.0.0 @@ -4145,6 +4381,8 @@ snapshots: dependencies: ms: 2.1.3 + decimal.js-light@2.5.1: {} + decimal.js@10.6.0: {} deep-eql@5.0.2: {} @@ -4207,6 +4445,8 @@ snapshots: has-tostringtag: 1.0.2 hasown: 2.0.2 + es-toolkit@1.45.1: {} + esbuild@0.25.12: optionalDependencies: '@esbuild/aix-ppc64': 0.25.12 @@ -4279,6 +4519,8 @@ snapshots: event-target-shim@5.0.1: {} + eventemitter3@5.0.4: {} + events@3.3.0: {} eventsource-parser@3.0.6: {} @@ -4482,8 +4724,14 @@ snapshots: ieee754@1.2.1: {} + immer@10.2.0: {} + + immer@11.1.4: {} + inherits@2.0.4: {} + internmap@2.0.3: {} + ip-address@10.0.1: {} ipaddr.js@1.9.1: {} @@ -4851,6 +5099,15 @@ snapshots: react-is@17.0.2: {} + react-redux@9.2.0(@types/react@19.2.14)(react@19.2.4)(redux@5.0.1): + dependencies: + '@types/use-sync-external-store': 0.0.6 + react: 19.2.4 + use-sync-external-store: 1.6.0(react@19.2.4) + optionalDependencies: + '@types/react': 19.2.14 + redux: 5.0.1 + react-refresh@0.17.0: {} react-router-dom@7.13.1(react-dom@19.2.4(react@19.2.4))(react@19.2.4): @@ -4879,10 +5136,38 @@ snapshots: real-require@0.2.0: {} + recharts@3.8.0(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react-is@17.0.2)(react@19.2.4)(redux@5.0.1): + dependencies: + '@reduxjs/toolkit': 2.11.2(react-redux@9.2.0(@types/react@19.2.14)(react@19.2.4)(redux@5.0.1))(react@19.2.4) + clsx: 2.1.1 + decimal.js-light: 2.5.1 + es-toolkit: 1.45.1 + eventemitter3: 5.0.4 + immer: 10.2.0 + react: 19.2.4 + react-dom: 19.2.4(react@19.2.4) + react-is: 17.0.2 + react-redux: 9.2.0(@types/react@19.2.14)(react@19.2.4)(redux@5.0.1) + reselect: 5.1.1 + tiny-invariant: 1.3.3 + use-sync-external-store: 1.6.0(react@19.2.4) + victory-vendor: 37.3.6 + transitivePeerDependencies: + - '@types/react' + - redux + + redux-thunk@3.1.0(redux@5.0.1): + dependencies: + redux: 5.0.1 + + redux@5.0.1: {} + require-from-string@2.0.2: {} requires-port@1.0.0: {} + reselect@5.1.1: {} + resolve-from@5.0.0: {} resolve-pkg-maps@1.0.0: {} @@ -5104,6 +5389,8 @@ snapshots: dependencies: real-require: 0.2.0 + tiny-invariant@1.3.3: {} + tinybench@2.9.0: {} tinyexec@0.3.2: {} @@ -5274,10 +5561,31 @@ snapshots: dependencies: extend: 2.0.2 + use-sync-external-store@1.6.0(react@19.2.4): + dependencies: + react: 19.2.4 + util-deprecate@1.0.2: {} vary@1.1.2: {} + victory-vendor@37.3.6: + dependencies: + '@types/d3-array': 3.2.2 + '@types/d3-ease': 3.0.2 + '@types/d3-interpolate': 3.0.4 + '@types/d3-scale': 4.0.9 + '@types/d3-shape': 3.1.8 + '@types/d3-time': 3.0.4 + '@types/d3-timer': 3.0.2 + d3-array: 3.2.4 + d3-ease: 3.0.1 + d3-interpolate: 3.0.1 + d3-scale: 4.0.2 + d3-shape: 3.2.0 + d3-time: 3.1.0 + d3-timer: 3.0.1 + vite-node@3.2.4(@types/node@22.19.15)(jiti@2.6.1)(lightningcss@1.31.1)(tsx@4.21.0): dependencies: cac: 6.7.14 diff --git a/src/connectors/web/routes/trading.ts b/src/connectors/web/routes/trading.ts index 23140c32..32434193 100644 --- a/src/connectors/web/routes/trading.ts +++ b/src/connectors/web/routes/trading.ts @@ -164,5 +164,84 @@ export function createTradingRoutes(ctx: EngineContext) { } }) + // ==================== Snapshot routes ==================== + + // Per-account snapshots + app.get('/accounts/:id/snapshots', async (c) => { + if (!ctx.snapshotService) return c.json({ snapshots: [] }) + const id = c.req.param('id') + const limit = Number(c.req.query('limit')) || 100 + try { + const snapshots = await ctx.snapshotService.getRecent(id, limit) + return c.json({ snapshots }) + } catch { + return c.json({ snapshots: [] }) + } + }) + + // Aggregated equity curve across all accounts + app.get('/snapshots/equity-curve', async (c) => { + if (!ctx.snapshotService) return c.json({ points: [] }) + const limit = Number(c.req.query('limit')) || 200 + + try { + const accounts = ctx.accountManager.resolve() + // Gather snapshots per account + const perAccount = await Promise.all( + accounts.map(async (uta) => { + const snaps = await ctx.snapshotService!.getRecent(uta.id, limit) + return { id: uta.id, label: uta.label, snaps } + }), + ) + + // Build time-indexed map: group snapshots by minute-rounded timestamp + const timeMap = new Map }>() + + for (const { id: accId, snaps } of perAccount) { + for (const snap of snaps) { + // Round to nearest minute for grouping + const d = new Date(snap.timestamp) + d.setSeconds(0, 0) + const key = d.toISOString() + + let entry = timeMap.get(key) + if (!entry) { + entry = { equity: 0, accounts: {} } + timeMap.set(key, entry) + } + entry.accounts[accId] = snap.account.netLiquidation + // Recalculate total equity from all accounts at this time + entry.equity = Object.values(entry.accounts).reduce((s, v) => s + (Number(v) || 0), 0) + } + } + + // Sort chronologically + const sorted = Array.from(timeMap.entries()) + .sort((a, b) => a[0].localeCompare(b[0])) + + // Carry forward: fill missing accounts with their last known value + const allAccountIds = accounts.map(a => a.id) + const lastKnown: Record = {} + + const points = sorted.map(([timestamp, { accounts: accs }]) => { + // Fill missing accounts from last known + for (const id of allAccountIds) { + if (!(id in accs) && id in lastKnown) { + accs[id] = lastKnown[id] + } + } + // Update last known + Object.assign(lastKnown, accs) + // Recalculate equity with filled values + const equity = Object.values(accs).reduce((s, v) => s + (Number(v) || 0), 0) + return { timestamp, equity: String(equity), accounts: accs } + }) + + return c.json({ points }) + } catch { + return c.json({ points: [] }) + } + }) + return app } diff --git a/src/core/config.ts b/src/core/config.ts index 66d1650b..3e190278 100644 --- a/src/core/config.ts +++ b/src/core/config.ts @@ -164,6 +164,11 @@ const heartbeatSchema = z.object({ activeHours: activeHoursSchema, }) +const snapshotSchema = z.object({ + enabled: z.boolean().default(true), + every: z.string().default('15m'), +}) + export const toolsSchema = z.object({ /** Tool names that are disabled. Tools not listed are enabled by default. */ disabled: z.array(z.string()).default([]), @@ -236,6 +241,7 @@ export type Config = { compaction: z.infer aiProvider: z.infer heartbeat: z.infer + snapshot: z.infer connectors: z.infer news: z.infer tools: z.infer @@ -271,7 +277,7 @@ async function parseAndSeed(filename: string, schema: z.ZodType, raw: unkn } export async function loadConfig(): Promise { - const files = ['engine.json', 'agent.json', 'crypto.json', 'securities.json', 'market-data.json', 'compaction.json', 'ai-provider-manager.json', 'heartbeat.json', 'connectors.json', 'news.json', 'tools.json'] as const + const files = ['engine.json', 'agent.json', 'crypto.json', 'securities.json', 'market-data.json', 'compaction.json', 'ai-provider-manager.json', 'heartbeat.json', 'snapshot.json', 'connectors.json', 'news.json', 'tools.json'] as const const raws = await Promise.all(files.map((f) => loadJsonFile(f))) // TODO: remove all migration blocks before v1.0 — no stable release yet, breaking changes are fine @@ -304,7 +310,7 @@ export async function loadConfig(): Promise { } // ---------- Migration: consolidate old telegram.json + engine port fields ---------- - const connectorsRaw = raws[8] as Record | undefined + const connectorsRaw = raws[9] as Record | undefined if (connectorsRaw === undefined) { const oldTelegram = await loadJsonFile('telegram.json') const oldEngine = raws[0] as Record | undefined @@ -321,7 +327,7 @@ export async function loadConfig(): Promise { await mkdir(CONFIG_DIR, { recursive: true }) await writeFile(resolve(CONFIG_DIR, 'engine.json'), JSON.stringify(cleanEngine, null, 2) + '\n') } - raws[8] = Object.keys(migrated).length > 0 ? migrated : undefined + raws[9] = Object.keys(migrated).length > 0 ? migrated : undefined } return { @@ -333,9 +339,10 @@ export async function loadConfig(): Promise { compaction: await parseAndSeed(files[5], compactionSchema, raws[5]), aiProvider: await parseAndSeed(files[6], aiProviderSchema, raws[6]), heartbeat: await parseAndSeed(files[7], heartbeatSchema, raws[7]), - connectors: await parseAndSeed(files[8], connectorsSchema, raws[8]), - news: await parseAndSeed(files[9], newsCollectorSchema, raws[9]), - tools: await parseAndSeed(files[10], toolsSchema, raws[10]), + snapshot: await parseAndSeed(files[8], snapshotSchema, raws[8]), + connectors: await parseAndSeed(files[9], connectorsSchema, raws[9]), + news: await parseAndSeed(files[10], newsCollectorSchema, raws[10]), + tools: await parseAndSeed(files[11], toolsSchema, raws[11]), } } @@ -455,6 +462,7 @@ const sectionSchemas: Record = { compaction: compactionSchema, aiProvider: aiProviderSchema, heartbeat: heartbeatSchema, + snapshot: snapshotSchema, connectors: connectorsSchema, news: newsCollectorSchema, tools: toolsSchema, @@ -469,6 +477,7 @@ const sectionFiles: Record = { compaction: 'compaction.json', aiProvider: 'ai-provider-manager.json', heartbeat: 'heartbeat.json', + snapshot: 'snapshot.json', connectors: 'connectors.json', news: 'news.json', tools: 'tools.json', diff --git a/src/core/types.ts b/src/core/types.ts index 5b16d9dd..744bad2c 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -1,4 +1,5 @@ import type { AccountManager } from '../domain/trading/index.js' +import type { SnapshotService } from '../domain/trading/snapshot/index.js' import type { CronEngine } from '../task/cron/engine.js' import type { Heartbeat } from '../task/heartbeat/index.js' import type { Config, WebChannel } from './config.js' @@ -34,6 +35,7 @@ export interface EngineContext { // Trading (unified account model) accountManager: AccountManager + snapshotService?: SnapshotService /** Reconnect connector plugins (Telegram, MCP-Ask, etc.). */ reconnectConnectors: () => Promise } diff --git a/src/domain/trading/UnifiedTradingAccount.ts b/src/domain/trading/UnifiedTradingAccount.ts index 7abe41c2..22aaad4b 100644 --- a/src/domain/trading/UnifiedTradingAccount.ts +++ b/src/domain/trading/UnifiedTradingAccount.ts @@ -58,6 +58,8 @@ export interface UnifiedTradingAccountOptions { savedState?: GitExportState onCommit?: (state: GitExportState) => void | Promise onHealthChange?: (accountId: string, health: BrokerHealthInfo) => void + onPostPush?: (accountId: string) => void | Promise + onPostReject?: (accountId: string) => void | Promise } // ==================== Stage param types ==================== @@ -108,6 +110,8 @@ export class UnifiedTradingAccount { private readonly _getState: () => Promise private readonly _onHealthChange?: (accountId: string, health: BrokerHealthInfo) => void + private readonly _onPostPush?: (accountId: string) => void | Promise + private readonly _onPostReject?: (accountId: string) => void | Promise // ---- Health tracking ---- private static readonly DEGRADED_THRESHOLD = 3 @@ -129,6 +133,8 @@ export class UnifiedTradingAccount { this.id = broker.id this.label = broker.label this._onHealthChange = options.onHealthChange + this._onPostPush = options.onPostPush + this._onPostReject = options.onPostReject // Wire internals this._getState = async (): Promise => { @@ -423,11 +429,15 @@ export class UnifiedTradingAccount { if (this.health === 'offline') { throw new Error(`Account "${this.label}" is offline. Cannot execute trades.`) } - return this.git.push() + const result = await this.git.push() + Promise.resolve(this._onPostPush?.(this.id)).catch(() => {}) + return result } - reject(reason?: string): Promise { - return this.git.reject(reason) + async reject(reason?: string): Promise { + const result = await this.git.reject(reason) + Promise.resolve(this._onPostReject?.(this.id)).catch(() => {}) + return result } // ==================== Git queries ==================== diff --git a/src/domain/trading/account-manager.ts b/src/domain/trading/account-manager.ts index d2cb87ce..b7b6b737 100644 --- a/src/domain/trading/account-manager.ts +++ b/src/domain/trading/account-manager.ts @@ -53,18 +53,28 @@ export interface ContractSearchResult { // ==================== AccountManager ==================== +export interface SnapshotHooks { + onPostPush?: (accountId: string) => void | Promise + onPostReject?: (accountId: string) => void | Promise +} + export class AccountManager { private entries = new Map() private reconnecting = new Set() private eventLog?: EventLog private toolCenter?: ToolCenter + private _snapshotHooks?: SnapshotHooks constructor(deps?: { eventLog: EventLog; toolCenter: ToolCenter }) { this.eventLog = deps?.eventLog this.toolCenter = deps?.toolCenter } + setSnapshotHooks(hooks: SnapshotHooks): void { + this._snapshotHooks = hooks + } + // ==================== Lifecycle ==================== /** Create a UTA from account config, register it, and start async broker connection. */ @@ -78,6 +88,8 @@ export class AccountManager { onHealthChange: (accountId, health) => { this.eventLog?.append('account.health', { accountId, ...health }) }, + onPostPush: this._snapshotHooks?.onPostPush, + onPostReject: this._snapshotHooks?.onPostReject, }) this.add(uta) return uta diff --git a/src/domain/trading/brokers/ccxt/CcxtBroker.spec.ts b/src/domain/trading/brokers/ccxt/CcxtBroker.spec.ts index 0c907a50..878e5ad7 100644 --- a/src/domain/trading/brokers/ccxt/CcxtBroker.spec.ts +++ b/src/domain/trading/brokers/ccxt/CcxtBroker.spec.ts @@ -609,12 +609,15 @@ describe('CcxtBroker — getAccount', () => { free: { USDT: 8000 }, used: { USDT: 2000 }, }) + // Positions must include contracts/contractSize/markPrice so the broker + // can reconstruct netLiquidation from fresh position market values. ;(acc as any).exchange.fetchPositions = vi.fn().mockResolvedValue([ - { unrealizedPnl: 500, realizedPnl: 100 }, - { unrealizedPnl: -200, realizedPnl: 50 }, + { contracts: 1, contractSize: 1, markPrice: 1500, unrealizedPnl: 500, realizedPnl: 100, side: 'long' }, + { contracts: 1, contractSize: 1, markPrice: 500, unrealizedPnl: -200, realizedPnl: 50, side: 'long' }, ]) const info = await acc.getAccount() + // netLiq = free (8000) + position market values (1500 + 500 = 2000) = 10000 expect(info.netLiquidation).toBe(10000) expect(info.totalCashValue).toBe(8000) expect(info.initMarginReq).toBe(2000) diff --git a/src/domain/trading/brokers/ccxt/CcxtBroker.ts b/src/domain/trading/brokers/ccxt/CcxtBroker.ts index febdf2ee..11c6709a 100644 --- a/src/domain/trading/brokers/ccxt/CcxtBroker.ts +++ b/src/domain/trading/brokers/ccxt/CcxtBroker.ts @@ -417,19 +417,36 @@ export class CcxtBroker implements IBroker { ]) const bal = balance as unknown as Record> - const total = parseFloat(String(bal['total']?.['USDT'] ?? bal['total']?.['USD'] ?? 0)) const free = parseFloat(String(bal['free']?.['USDT'] ?? bal['free']?.['USD'] ?? 0)) const used = parseFloat(String(bal['used']?.['USDT'] ?? bal['used']?.['USD'] ?? 0)) + // Aggregate P&L and market value from positions. + // We use position-level markPrice (which is fresh from the exchange's + // websocket feed) rather than balance.total (which is a cached wallet + // snapshot that may not update between funding/settlement cycles). let unrealizedPnL = 0 let realizedPnL = 0 + let totalPositionValue = 0 for (const p of rawPositions) { unrealizedPnL += parseFloat(String(p.unrealizedPnl ?? 0)) realizedPnL += parseFloat(String((p as unknown as Record).realizedPnl ?? 0)) + + // Compute position market value from fresh markPrice + const contracts = new Decimal(String(p.contracts ?? 0)).abs() + const contractSize = new Decimal(String(p.contractSize ?? 1)) + const quantity = contracts.mul(contractSize) + const markPrice = parseFloat(String(p.markPrice ?? 0)) + totalPositionValue += quantity.toNumber() * markPrice } + // Reconstruct netLiquidation from fresh components: + // netLiq = available cash + total position market value + // This gives a real-time equity figure that tracks markPrice movements, + // unlike balance.total which only updates on exchange settlement. + const netLiquidation = free + totalPositionValue + return { - netLiquidation: total, + netLiquidation, totalCashValue: free, unrealizedPnL, realizedPnL, diff --git a/src/domain/trading/brokers/ibkr/IbkrBroker.ts b/src/domain/trading/brokers/ibkr/IbkrBroker.ts index 7541892c..d0c364eb 100644 --- a/src/domain/trading/brokers/ibkr/IbkrBroker.ts +++ b/src/domain/trading/brokers/ibkr/IbkrBroker.ts @@ -247,10 +247,40 @@ export class IbkrBroker implements IBroker { async getAccount(): Promise { const download = await this.downloadAccount() + // TotalCashValue is stable (cash doesn't change with market moves). + const totalCashValue = parseFloat(download.values.get('TotalCashValue') ?? '0') + + // Reconstruct netLiquidation and unrealizedPnL from position-level data. + // + // TWS's account-level tags (NetLiquidation, UnrealizedPnL) are cached + // server-side and may not refresh between market sessions. However, the + // updatePortfolio() callbacks that populate download.positions carry + // per-position marketPrice, marketValue, and unrealizedPnL that are + // more current. + // + // Formula: netLiq = cash + Σ(position.marketValue) + // + // When there are no positions, fall back to the TWS-reported value + // since cash-only accounts have accurate NetLiquidation. + let totalMarketValue = 0 + let positionUnrealizedPnL = 0 + for (const pos of download.positions) { + totalMarketValue += pos.marketValue + positionUnrealizedPnL += pos.unrealizedPnL + } + + const netLiquidation = download.positions.length > 0 + ? totalCashValue + totalMarketValue + : parseFloat(download.values.get('NetLiquidation') ?? '0') + + const unrealizedPnL = download.positions.length > 0 + ? positionUnrealizedPnL + : parseFloat(download.values.get('UnrealizedPnL') ?? '0') + return { - netLiquidation: parseFloat(download.values.get('NetLiquidation') ?? '0'), - totalCashValue: parseFloat(download.values.get('TotalCashValue') ?? '0'), - unrealizedPnL: parseFloat(download.values.get('UnrealizedPnL') ?? '0'), + netLiquidation, + totalCashValue, + unrealizedPnL, realizedPnL: parseFloat(download.values.get('RealizedPnL') ?? '0'), buyingPower: parseFloat(download.values.get('BuyingPower') ?? '0'), initMarginReq: parseFloat(download.values.get('InitMarginReq') ?? '0'), diff --git a/src/domain/trading/git/TradingGit.ts b/src/domain/trading/git/TradingGit.ts index bdbbe43b..7a38a185 100644 --- a/src/domain/trading/git/TradingGit.ts +++ b/src/domain/trading/git/TradingGit.ts @@ -289,6 +289,7 @@ export class TradingGit implements ITradingGit { return { staged: [...this.stagingArea], pendingMessage: this.pendingMessage, + pendingHash: this.pendingHash, head: this.head, commitCount: this.commits.length, } diff --git a/src/domain/trading/git/types.ts b/src/domain/trading/git/types.ts index 8654625a..0cf34ab0 100644 --- a/src/domain/trading/git/types.ts +++ b/src/domain/trading/git/types.ts @@ -100,6 +100,7 @@ export interface RejectResult { export interface GitStatus { staged: Operation[] pendingMessage: string | null + pendingHash: CommitHash | null head: CommitHash | null commitCount: number } diff --git a/src/domain/trading/index.ts b/src/domain/trading/index.ts index b137b4c5..30996722 100644 --- a/src/domain/trading/index.ts +++ b/src/domain/trading/index.ts @@ -11,6 +11,7 @@ export type { AccountSummary, AggregatedEquity, ContractSearchResult, + SnapshotHooks, } from './account-manager.js' // Brokers (types + implementations + factory) @@ -57,6 +58,22 @@ export type { SimulatePriceChangeResult, } from './git/index.js' +// Snapshots +export { + createSnapshotService, + createSnapshotScheduler, + createSnapshotStore, + buildSnapshot, +} from './snapshot/index.js' +export type { + SnapshotService, + SnapshotScheduler, + SnapshotStore, + UTASnapshot, + SnapshotTrigger, + SnapshotIndex, +} from './snapshot/index.js' + // Guards export { createGuardPipeline, diff --git a/src/domain/trading/snapshot/builder.ts b/src/domain/trading/snapshot/builder.ts new file mode 100644 index 00000000..ab6c2c0d --- /dev/null +++ b/src/domain/trading/snapshot/builder.ts @@ -0,0 +1,72 @@ +/** + * Snapshot builder — assembles a UTASnapshot from a live UTA. + * + * Only returns a snapshot when real data is successfully fetched. + * Returns null when data cannot be obtained (offline, disabled, network error). + * Never fabricates zero-value placeholders. + */ + +import type { UnifiedTradingAccount } from '../UnifiedTradingAccount.js' +import type { UTASnapshot, SnapshotTrigger } from './types.js' + +export async function buildSnapshot( + uta: UnifiedTradingAccount, + trigger: SnapshotTrigger, +): Promise { + // Can't get real data from offline/disabled accounts + if (uta.disabled || uta.health === 'offline') return null + + try { + const pendingOrderIds = uta.git.getPendingOrderIds().map(p => p.orderId) + const [accountInfo, positions, orders] = await Promise.all([ + uta.getAccount(), + uta.getPositions(), + uta.getOrders(pendingOrderIds), + ]) + + const gitStatus = uta.git.status() + + return { + accountId: uta.id, + timestamp: new Date().toISOString(), + trigger, + account: { + netLiquidation: String(accountInfo.netLiquidation), + totalCashValue: String(accountInfo.totalCashValue), + unrealizedPnL: String(accountInfo.unrealizedPnL), + realizedPnL: String(accountInfo.realizedPnL ?? 0), + buyingPower: accountInfo.buyingPower != null ? String(accountInfo.buyingPower) : undefined, + initMarginReq: accountInfo.initMarginReq != null ? String(accountInfo.initMarginReq) : undefined, + maintMarginReq: accountInfo.maintMarginReq != null ? String(accountInfo.maintMarginReq) : undefined, + }, + positions: positions.map(p => ({ + aliceId: p.contract.aliceId ?? uta.broker.getNativeKey(p.contract), + side: p.side, + quantity: p.quantity.toString(), + avgCost: String(p.avgCost), + marketPrice: String(p.marketPrice), + marketValue: String(p.marketValue), + unrealizedPnL: String(p.unrealizedPnL), + realizedPnL: String(p.realizedPnL), + })), + openOrders: orders + .filter(o => o.orderState.status === 'Submitted' || o.orderState.status === 'PreSubmitted') + .map(o => ({ + orderId: String(o.order.orderId), + aliceId: o.contract.aliceId ?? uta.broker.getNativeKey(o.contract), + action: o.order.action, + orderType: o.order.orderType, + totalQuantity: o.order.totalQuantity.toString(), + limitPrice: o.order.lmtPrice != null ? String(o.order.lmtPrice) : undefined, + status: o.orderState.status, + avgFillPrice: o.avgFillPrice != null ? String(o.avgFillPrice) : undefined, + })), + health: uta.disabled ? 'disabled' : uta.health, + headCommit: gitStatus.head, + pendingCommits: gitStatus.pendingHash ? [gitStatus.pendingHash] : [], + } + } catch (err) { + console.warn(`snapshot: build failed for ${uta.id}:`, err instanceof Error ? err.message : err) + return null + } +} diff --git a/src/domain/trading/snapshot/index.ts b/src/domain/trading/snapshot/index.ts new file mode 100644 index 00000000..54d13b9f --- /dev/null +++ b/src/domain/trading/snapshot/index.ts @@ -0,0 +1,8 @@ +export { createSnapshotService } from './service.js' +export type { SnapshotService } from './service.js' +export { createSnapshotScheduler } from './scheduler.js' +export type { SnapshotScheduler, SnapshotConfig } from './scheduler.js' +export { createSnapshotStore } from './store.js' +export type { SnapshotStore } from './store.js' +export { buildSnapshot } from './builder.js' +export type { UTASnapshot, SnapshotTrigger, SnapshotIndex, SnapshotChunkEntry } from './types.js' diff --git a/src/domain/trading/snapshot/scheduler.ts b/src/domain/trading/snapshot/scheduler.ts new file mode 100644 index 00000000..98fca8b6 --- /dev/null +++ b/src/domain/trading/snapshot/scheduler.ts @@ -0,0 +1,85 @@ +/** + * Snapshot scheduler — periodic snapshots via cron engine. + * + * Registers a cron job (`__snapshot__`) and subscribes to `cron.fire` events. + * When fired, captures snapshots for all accounts. + * + * Follows the same pattern as the heartbeat system. + */ + +import type { EventLog, EventLogEntry } from '../../../core/event-log.js' +import type { CronEngine, CronFirePayload } from '../../../task/cron/engine.js' +import type { SnapshotService } from './service.js' + +const SNAPSHOT_JOB_NAME = '__snapshot__' + +export interface SnapshotConfig { + enabled: boolean + every: string +} + +export interface SnapshotScheduler { + start(): Promise + stop(): void +} + +export function createSnapshotScheduler(deps: { + snapshotService: SnapshotService + cronEngine: CronEngine + eventLog: EventLog + config: SnapshotConfig +}): SnapshotScheduler { + const { snapshotService, cronEngine, eventLog, config } = deps + + let unsubscribe: (() => void) | null = null + let processing = false + + async function handleFire(entry: EventLogEntry): Promise { + const payload = entry.payload as CronFirePayload + if (payload.jobName !== SNAPSHOT_JOB_NAME) return + if (processing) return + + processing = true + try { + await snapshotService.takeAllSnapshots('scheduled') + } catch (err) { + console.warn('snapshot-scheduler: error:', err instanceof Error ? err.message : err) + } finally { + processing = false + } + } + + return { + async start() { + // Find or create the cron job + const existing = cronEngine.list().find(j => j.name === SNAPSHOT_JOB_NAME) + if (existing) { + await cronEngine.update(existing.id, { + schedule: { kind: 'every', every: config.every }, + enabled: config.enabled, + }) + } else { + await cronEngine.add({ + name: SNAPSHOT_JOB_NAME, + schedule: { kind: 'every', every: config.every }, + payload: '', + enabled: config.enabled, + }) + } + + // Subscribe to cron.fire events + if (!unsubscribe) { + unsubscribe = eventLog.subscribeType('cron.fire', (entry) => { + handleFire(entry).catch(err => { + console.error('snapshot-scheduler: unhandled error:', err) + }) + }) + } + }, + + stop() { + unsubscribe?.() + unsubscribe = null + }, + } +} diff --git a/src/domain/trading/snapshot/service.ts b/src/domain/trading/snapshot/service.ts new file mode 100644 index 00000000..236a7f76 --- /dev/null +++ b/src/domain/trading/snapshot/service.ts @@ -0,0 +1,105 @@ +/** + * Snapshot service — orchestrates builder + store. + * + * Only persists snapshots with real data. If the builder returns null + * (offline, network error), the snapshot is skipped — never stored. + * takeAllSnapshots retries failed accounts once after a short delay. + * + * Store instances are cached per account to ensure writes are serialized. + */ + +import type { AccountManager } from '../account-manager.js' +import type { EventLog } from '../../../core/event-log.js' +import type { SnapshotStore } from './store.js' +import type { UTASnapshot, SnapshotTrigger } from './types.js' +import { buildSnapshot } from './builder.js' +import { createSnapshotStore } from './store.js' + +const RETRY_DELAY_MS = 3_000 + +export interface SnapshotService { + takeSnapshot(accountId: string, trigger: SnapshotTrigger): Promise + takeAllSnapshots(trigger: SnapshotTrigger): Promise + getRecent(accountId: string, limit?: number): Promise +} + +export function createSnapshotService(deps: { + accountManager: AccountManager + eventLog?: EventLog +}): SnapshotService { + const { accountManager, eventLog } = deps + const stores = new Map() + + function getStore(accountId: string): SnapshotStore { + let s = stores.get(accountId) + if (!s) { + s = createSnapshotStore(accountId) + stores.set(accountId, s) + } + return s + } + + return { + async takeSnapshot(accountId, trigger) { + const uta = accountManager.get(accountId) + if (!uta) return null + + try { + const snapshot = await buildSnapshot(uta, trigger) + + if (!snapshot) { + // Builder couldn't get real data — skip, don't store + await eventLog?.append('snapshot.skipped', { + accountId, + trigger, + reason: 'no-data', + }).catch(() => {}) + return null + } + + await getStore(accountId).append(snapshot) + await eventLog?.append('snapshot.taken', { + accountId, + trigger, + timestamp: snapshot.timestamp, + }) + return snapshot + } catch (err) { + const msg = err instanceof Error ? err.message : String(err) + console.warn(`snapshot: failed for ${accountId}:`, msg) + await eventLog?.append('snapshot.error', { accountId, trigger, error: msg }).catch(() => {}) + return null + } + }, + + async takeAllSnapshots(trigger) { + const accounts = accountManager.resolve() + + // First round — try all accounts + const results = await Promise.allSettled( + accounts.map(async uta => ({ + id: uta.id, + snap: await this.takeSnapshot(uta.id, trigger), + })), + ) + + // Collect failed account IDs (returned null) + const failed = results + .filter((r): r is PromiseFulfilledResult<{ id: string; snap: UTASnapshot | null }> => + r.status === 'fulfilled' && r.value.snap === null) + .map(r => r.value.id) + + if (failed.length === 0) return + + // Retry once after a short delay + await new Promise(r => setTimeout(r, RETRY_DELAY_MS)) + await Promise.allSettled( + failed.map(id => this.takeSnapshot(id, trigger)), + ) + }, + + async getRecent(accountId, limit = 10) { + return getStore(accountId).readRange({ limit }) + }, + } +} diff --git a/src/domain/trading/snapshot/snapshot.spec.ts b/src/domain/trading/snapshot/snapshot.spec.ts new file mode 100644 index 00000000..909cfb59 --- /dev/null +++ b/src/domain/trading/snapshot/snapshot.spec.ts @@ -0,0 +1,626 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import { readFile, rm } from 'node:fs/promises' +import { randomUUID } from 'node:crypto' +import Decimal from 'decimal.js' +import { Order, OrderState } from '@traderalice/ibkr' +import { UnifiedTradingAccount } from '../UnifiedTradingAccount.js' +import type { UnifiedTradingAccountOptions } from '../UnifiedTradingAccount.js' +import { MockBroker, makeContract, makePosition, makeOpenOrder } from '../brokers/mock/index.js' +import { AccountManager } from '../account-manager.js' +import { createEventLog, type EventLog } from '../../../core/event-log.js' +import { createCronEngine, type CronEngine } from '../../../task/cron/engine.js' +import { buildSnapshot } from './builder.js' +import { createSnapshotStore, type SnapshotStore } from './store.js' +import { createSnapshotService, type SnapshotService } from './service.js' +import { createSnapshotScheduler, type SnapshotScheduler } from './scheduler.js' +import type { UTASnapshot, SnapshotIndex } from './types.js' +import '../contract-ext.js' + +// ==================== Helpers ==================== + +function createUTA(broker?: MockBroker, options?: UnifiedTradingAccountOptions) { + const b = broker ?? new MockBroker() + const uta = new UnifiedTradingAccount(b, options) + return { uta, broker: b } +} + +function tempDir(): string { + return join(tmpdir(), `snapshot-test-${randomUUID()}`) +} + +function tempPath(ext: string): string { + return join(tmpdir(), `snapshot-test-${randomUUID()}.${ext}`) +} + +function makeSubmittedOrder(symbol = 'AAPL'): ReturnType { + const contract = makeContract({ symbol, aliceId: `mock-${symbol}` }) + const order = new Order() + order.orderId = 42 + order.action = 'BUY' + order.orderType = 'LMT' + order.totalQuantity = new Decimal(5) + order.lmtPrice = 150 + const orderState = new OrderState() + orderState.status = 'Submitted' + return { contract, order, orderState } +} + +function makeFilledOrder(symbol = 'AAPL'): ReturnType { + const o = makeSubmittedOrder(symbol) + o.orderState.status = 'Filled' + return o +} + +/** Flush microtasks so fire-and-forget callbacks complete */ +async function flush() { await vi.advanceTimersByTimeAsync(0) } + +// ==================== Builder Tests ==================== + +describe('Snapshot Builder', () => { + let uta: UnifiedTradingAccount + let broker: MockBroker + + beforeEach(() => { + ({ uta, broker } = createUTA()) + }) + + // #1 + it('builds complete snapshot from healthy UTA', async () => { + broker.setPositions([makePosition()]) + const snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + + expect(snap!.accountId).toBe(broker.id) + expect(snap!.trigger).toBe('manual') + expect(snap!.health).toBe('healthy') + expect(snap!.positions).toHaveLength(1) + expect(snap!.account.netLiquidation).toBeTruthy() + expect(snap!.timestamp).toBeTruthy() + }) + + // #2 + it('stores all financial values as strings', async () => { + const snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + + expect(typeof snap!.account.netLiquidation).toBe('string') + expect(typeof snap!.account.totalCashValue).toBe('string') + expect(typeof snap!.account.unrealizedPnL).toBe('string') + expect(typeof snap!.account.realizedPnL).toBe('string') + }) + + // #3 + it('positions use aliceId, not full contract', async () => { + const pos = makePosition({ contract: makeContract({ symbol: 'TSLA', aliceId: 'mock-TSLA' }) }) + broker.setPositions([pos]) + const snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + + expect(snap!.positions[0].aliceId).toBe(`${broker.id}|TSLA`) + expect(snap!.positions[0]).not.toHaveProperty('contract') + expect(typeof snap!.positions[0].quantity).toBe('string') + expect(typeof snap!.positions[0].avgCost).toBe('string') + }) + + // #4 + it('only includes Submitted/PreSubmitted orders', async () => { + const contract = makeContract({ symbol: 'AAPL' }) + broker.setQuote('AAPL', 150) + + const order = new Order() + order.action = 'BUY' + order.orderType = 'LMT' + order.totalQuantity = new Decimal(5) + order.lmtPrice = 140 + order.tif = 'DAY' + + uta.git.add({ action: 'placeOrder', contract, order }) + uta.git.commit('buy limit') + await uta.push() + + const snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + + expect(snap!.openOrders).toHaveLength(1) + expect(snap!.openOrders[0].status).toBe('Submitted') + expect(snap!.openOrders[0].orderType).toBe('LMT') + }) + + // #5 + it('returns null when UTA is disabled', async () => { + broker.setPositions([makePosition()]) + ;(uta as any)._disabled = true + + const snap = await buildSnapshot(uta, 'scheduled') + expect(snap).toBeNull() + }) + + // #6 + it('returns null when UTA is offline', async () => { + ;(uta as any)._consecutiveFailures = 6 + + const snap = await buildSnapshot(uta, 'scheduled') + expect(snap).toBeNull() + }) + + // #7 + it('returns null when broker query throws', async () => { + broker.setFailMode(3) + + const snap = await buildSnapshot(uta, 'manual') + expect(snap).toBeNull() + }) + + // #8 + it('captures headCommit and pendingCommits from git status', async () => { + // No commits yet + let snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + expect(snap!.headCommit).toBeNull() + expect(snap!.pendingCommits).toEqual([]) + + // Stage and commit (but don't push) + uta.git.add({ action: 'placeOrder', contract: makeContract(), order: new Order() }) + const { hash } = uta.git.commit('test order') + + snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + expect(snap!.pendingCommits).toEqual([hash]) + expect(snap!.headCommit).toBeNull() // not pushed yet + }) + + // #9 + it('passes trigger field correctly', async () => { + for (const trigger of ['scheduled', 'post-push', 'post-reject', 'manual'] as const) { + const snap = await buildSnapshot(uta, trigger) + expect(snap).not.toBeNull() + expect(snap!.trigger).toBe(trigger) + } + }) + + // #10 + it('omits optional fields when not available', async () => { + broker.setAccountInfo({ buyingPower: undefined, initMarginReq: undefined }) + const snap = await buildSnapshot(uta, 'manual') + expect(snap).not.toBeNull() + + expect(snap!.account.buyingPower).toBeUndefined() + expect(snap!.account.initMarginReq).toBeUndefined() + expect(snap!.account.maintMarginReq).toBeUndefined() + }) +}) + +// ==================== Store Tests ==================== + +describe('Snapshot Store', () => { + let store: SnapshotStore + let dir: string + + function makeSnapshot(overrides: Partial = {}): UTASnapshot { + return { + accountId: 'test-acc', + timestamp: new Date().toISOString(), + trigger: 'manual', + account: { + netLiquidation: '100000', + totalCashValue: '90000', + unrealizedPnL: '5000', + realizedPnL: '1000', + }, + positions: [], + openOrders: [], + health: 'healthy', + headCommit: null, + pendingCommits: [], + ...overrides, + } + } + + beforeEach(() => { + dir = tempDir() + store = createSnapshotStore('test-acc', { baseDir: dir }) + }) + + afterEach(async () => { + await rm(dir, { recursive: true, force: true }) + }) + + // #11 + it('creates index and first chunk on first write', async () => { + const snap = makeSnapshot() + await store.append(snap) + + const results = await store.readRange({ limit: 10 }) + expect(results).toHaveLength(1) + expect(results[0].accountId).toBe(snap.accountId) + }) + + // #12 + it('rolls over to new chunk after 50 snapshots', async () => { + // Write 51 snapshots + for (let i = 0; i < 51; i++) { + await store.append(makeSnapshot({ + timestamp: new Date(Date.now() + i * 1000).toISOString(), + })) + } + + const all = await store.readRange() + expect(all).toHaveLength(51) + }) + + // #13 + it('maintains correct index metadata', async () => { + const t1 = '2025-01-01T00:00:00.000Z' + const t2 = '2025-01-01T00:01:00.000Z' + const t3 = '2025-01-01T00:02:00.000Z' + + await store.append(makeSnapshot({ timestamp: t1 })) + await store.append(makeSnapshot({ timestamp: t2 })) + await store.append(makeSnapshot({ timestamp: t3 })) + + const results = await store.readRange() + expect(results).toHaveLength(3) + // Newest first + expect(results[0].timestamp).toBe(t3) + expect(results[2].timestamp).toBe(t1) + }) + + // #14 + it('returns snapshots in reverse chronological order', async () => { + const timestamps = ['2025-01-01T00:00:00Z', '2025-01-01T01:00:00Z', '2025-01-01T02:00:00Z'] + for (const ts of timestamps) { + await store.append(makeSnapshot({ timestamp: ts })) + } + + const results = await store.readRange() + expect(results[0].timestamp).toBe('2025-01-01T02:00:00Z') + expect(results[2].timestamp).toBe('2025-01-01T00:00:00Z') + }) + + // #15 + it('respects limit parameter', async () => { + for (let i = 0; i < 10; i++) { + await store.append(makeSnapshot({ timestamp: new Date(Date.now() + i * 1000).toISOString() })) + } + + const results = await store.readRange({ limit: 3 }) + expect(results).toHaveLength(3) + }) + + // #16 + it('filters by time range', async () => { + await store.append(makeSnapshot({ timestamp: '2025-01-01T00:00:00Z' })) + await store.append(makeSnapshot({ timestamp: '2025-01-02T00:00:00Z' })) + await store.append(makeSnapshot({ timestamp: '2025-01-03T00:00:00Z' })) + + const results = await store.readRange({ + startTime: '2025-01-01T12:00:00Z', + endTime: '2025-01-02T12:00:00Z', + }) + + expect(results).toHaveLength(1) + expect(results[0].timestamp).toBe('2025-01-02T00:00:00Z') + }) + + // #17 (verifying correct total across chunk boundary) + it('reads across multiple chunks correctly', async () => { + // Write 60 snapshots (chunk 1: 50, chunk 2: 10) + for (let i = 0; i < 60; i++) { + await store.append(makeSnapshot({ + timestamp: new Date(Date.now() + i * 1000).toISOString(), + })) + } + + const all = await store.readRange() + expect(all).toHaveLength(60) + }) + + // #18 + it('returns empty array for empty store', async () => { + const results = await store.readRange() + expect(results).toEqual([]) + }) + + // #19 + it('handles concurrent appends safely via write lock', async () => { + // Fire two appends concurrently + const p1 = store.append(makeSnapshot({ timestamp: '2025-01-01T00:00:01Z' })) + const p2 = store.append(makeSnapshot({ timestamp: '2025-01-01T00:00:02Z' })) + await Promise.all([p1, p2]) + + const results = await store.readRange() + expect(results).toHaveLength(2) + }) +}) + +// ==================== Service Tests ==================== + +describe('Snapshot Service', () => { + let manager: AccountManager + let eventLog: EventLog + let service: SnapshotService + + beforeEach(async () => { + manager = new AccountManager() + const logPath = tempPath('jsonl') + eventLog = await createEventLog({ logPath }) + + const broker = new MockBroker({ id: 'acc1', label: 'Test' }) + const uta = new UnifiedTradingAccount(broker) + manager.add(uta) + + service = createSnapshotService({ accountManager: manager, eventLog }) + }) + + afterEach(async () => { + await eventLog._resetForTest() + }) + + // #20 + it('takes snapshot and logs event', async () => { + const snap = await service.takeSnapshot('acc1', 'manual') + + expect(snap).not.toBeNull() + expect(snap!.accountId).toBe('acc1') + expect(snap!.trigger).toBe('manual') + + // Check eventLog has snapshot.taken + const events = eventLog.recent({ type: 'snapshot.taken' }) + expect(events).toHaveLength(1) + expect(events[0].payload).toMatchObject({ + accountId: 'acc1', + trigger: 'manual', + }) + }) + + // #21 + it('returns null for unknown account', async () => { + const snap = await service.takeSnapshot('nonexistent', 'manual') + expect(snap).toBeNull() + }) + + // #22 + it('returns null and logs snapshot.skipped when builder fails', async () => { + const uta = manager.get('acc1')! + vi.spyOn(uta, 'getAccount').mockRejectedValue(new Error('network timeout')) + + const snap = await service.takeSnapshot('acc1', 'scheduled') + + expect(snap).toBeNull() + // Should log skipped, not store anything + const skipped = eventLog.recent({ type: 'snapshot.skipped' }) + expect(skipped).toHaveLength(1) + expect(skipped[0].payload).toMatchObject({ accountId: 'acc1', reason: 'no-data' }) + }) + + // #23 + it('takeAllSnapshots captures all accounts', async () => { + const broker2 = new MockBroker({ id: 'acc2', label: 'Test2' }) + manager.add(new UnifiedTradingAccount(broker2)) + + await service.takeAllSnapshots('scheduled') + + const events = eventLog.recent({ type: 'snapshot.taken' }) + expect(events).toHaveLength(2) + const ids = events.map(e => (e.payload as any).accountId) + expect(ids).toContain('acc1') + expect(ids).toContain('acc2') + }) + + // #24 + it('takeAllSnapshots: single failure does not affect others', async () => { + const broker2 = new MockBroker({ id: 'acc2', label: 'Failing' }) + const uta2 = new UnifiedTradingAccount(broker2) + manager.add(uta2) + // Make uta2 disabled — builder returns null, not stored + ;(uta2 as any)._disabled = true + + await service.takeAllSnapshots('scheduled') + + // Only acc1 gets stored; acc2 is skipped (disabled → null) + const taken = eventLog.recent({ type: 'snapshot.taken' }) + expect(taken).toHaveLength(1) + expect((taken[0].payload as any).accountId).toBe('acc1') + + // acc2 should be skipped + const skipped = eventLog.recent({ type: 'snapshot.skipped' }) + expect(skipped.length).toBeGreaterThanOrEqual(1) + }) + + // #25 + it('getRecent delegates to store', async () => { + // Take a snapshot first + await service.takeSnapshot('acc1', 'manual') + await service.takeSnapshot('acc1', 'scheduled') + + const recent = await service.getRecent('acc1', 1) + expect(recent).toHaveLength(1) + }) +}) + +// ==================== Scheduler Tests ==================== + +describe('Snapshot Scheduler', () => { + let eventLog: EventLog + let cronEngine: CronEngine + let scheduler: SnapshotScheduler + let mockService: SnapshotService + + beforeEach(async () => { + const logPath = tempPath('jsonl') + const storePath = tempPath('json') + eventLog = await createEventLog({ logPath }) + cronEngine = createCronEngine({ eventLog, storePath }) + await cronEngine.start() + + mockService = { + takeSnapshot: vi.fn(async () => null), + takeAllSnapshots: vi.fn(async () => {}), + getRecent: vi.fn(async () => []), + } + + scheduler = createSnapshotScheduler({ + snapshotService: mockService, + cronEngine, + eventLog, + config: { enabled: true, every: '15m' }, + }) + }) + + afterEach(async () => { + scheduler?.stop() + cronEngine.stop() + await eventLog._resetForTest() + }) + + // #26 + it('registers __snapshot__ cron job on start', async () => { + await scheduler.start() + + const jobs = cronEngine.list() + const snapshotJob = jobs.find(j => j.name === '__snapshot__') + expect(snapshotJob).toBeDefined() + expect(snapshotJob!.enabled).toBe(true) + }) + + // #27 + it('reuses existing job on repeated start (idempotent)', async () => { + await scheduler.start() + const jobsBefore = cronEngine.list().filter(j => j.name === '__snapshot__') + + await scheduler.start() + const jobsAfter = cronEngine.list().filter(j => j.name === '__snapshot__') + + expect(jobsBefore).toHaveLength(1) + expect(jobsAfter).toHaveLength(1) + expect(jobsBefore[0].id).toBe(jobsAfter[0].id) + }) + + // #28 + it('fires takeAllSnapshots on cron.fire event', async () => { + await scheduler.start() + + // Trigger the cron job manually + const job = cronEngine.list().find(j => j.name === '__snapshot__')! + await cronEngine.runNow(job.id) + + // Give the async handler time to complete + await new Promise(r => setTimeout(r, 50)) + + expect(mockService.takeAllSnapshots).toHaveBeenCalledWith('scheduled') + }) + + // #29 + it('ignores cron.fire for other jobs', async () => { + await scheduler.start() + + // Create a different job and fire it + const otherId = await cronEngine.add({ + name: 'other-job', + schedule: { kind: 'every', every: '1h' }, + payload: '', + }) + await cronEngine.runNow(otherId) + + await new Promise(r => setTimeout(r, 50)) + + expect(mockService.takeAllSnapshots).not.toHaveBeenCalled() + }) + + // #30 + it('processing lock prevents concurrent fires', async () => { + // Make takeAllSnapshots slow + let resolveFirst: () => void + const firstCall = new Promise(r => { resolveFirst = r }) + ;(mockService.takeAllSnapshots as any).mockImplementationOnce(async () => { + await firstCall + }) + + await scheduler.start() + const job = cronEngine.list().find(j => j.name === '__snapshot__')! + + // Fire twice quickly + await cronEngine.runNow(job.id) + await new Promise(r => setTimeout(r, 10)) + await cronEngine.runNow(job.id) + await new Promise(r => setTimeout(r, 10)) + + // Second fire should be skipped (processing=true) + resolveFirst!() + await new Promise(r => setTimeout(r, 50)) + + expect(mockService.takeAllSnapshots).toHaveBeenCalledTimes(1) + }) + + // #31 + it('stop() unsubscribes from events', async () => { + await scheduler.start() + scheduler.stop() + + const job = cronEngine.list().find(j => j.name === '__snapshot__')! + await cronEngine.runNow(job.id) + await new Promise(r => setTimeout(r, 50)) + + expect(mockService.takeAllSnapshots).not.toHaveBeenCalled() + }) +}) + +// ==================== UTA Hook Tests ==================== + +describe('UTA — post-push/reject hooks', () => { + // #32 + it('calls onPostPush after successful push', async () => { + const onPostPush = vi.fn() + const { uta, broker } = createUTA(undefined, { onPostPush }) + + broker.setQuote('AAPL', 150) + uta.git.add({ action: 'placeOrder', contract: makeContract(), order: new Order() }) + uta.git.commit('buy') + await uta.push() + + // fire-and-forget, but should be called + await new Promise(r => setTimeout(r, 10)) + expect(onPostPush).toHaveBeenCalledWith(uta.id) + }) + + // #33 + it('calls onPostReject after reject', async () => { + const onPostReject = vi.fn() + const { uta } = createUTA(undefined, { onPostReject }) + + uta.git.add({ action: 'placeOrder', contract: makeContract(), order: new Order() }) + uta.git.commit('buy') + await uta.reject('changed mind') + + await new Promise(r => setTimeout(r, 10)) + expect(onPostReject).toHaveBeenCalledWith(uta.id) + }) + + // #34 + it('does not call hook when push fails (disabled)', async () => { + const onPostPush = vi.fn() + const { uta } = createUTA(undefined, { onPostPush }) + ;(uta as any)._disabled = true + + uta.git.add({ action: 'placeOrder', contract: makeContract(), order: new Order() }) + uta.git.commit('buy') + + await expect(uta.push()).rejects.toThrow() + expect(onPostPush).not.toHaveBeenCalled() + }) + + // #35 + it('hook error does not affect push return value', async () => { + const onPostPush = vi.fn().mockRejectedValue(new Error('hook failed')) + const { uta } = createUTA(undefined, { onPostPush }) + + uta.git.add({ action: 'placeOrder', contract: makeContract(), order: new Order() }) + uta.git.commit('buy') + const result = await uta.push() + + expect(result).toBeDefined() + expect(result.hash).toBeTruthy() + }) +}) diff --git a/src/domain/trading/snapshot/store.ts b/src/domain/trading/snapshot/store.ts new file mode 100644 index 00000000..10927908 --- /dev/null +++ b/src/domain/trading/snapshot/store.ts @@ -0,0 +1,123 @@ +/** + * Snapshot store — chunked JSONL persistence with index. + * + * Storage layout: + * data/trading/{accountId}/snapshots/ + * ├── index.json + * ├── chunk-0001.jsonl + * ├── chunk-0002.jsonl + * └── ... + * + * Each chunk holds up to CHUNK_SIZE snapshots (one JSON line each). + * The index tracks chunk metadata for efficient time-range queries. + * + * Writes are serialized via a Promise chain to prevent concurrent + * appends from corrupting the index. + */ + +import { readFile, writeFile, appendFile, rename, mkdir } from 'node:fs/promises' +import { resolve } from 'node:path' +import type { UTASnapshot, SnapshotIndex } from './types.js' + +const CHUNK_SIZE = 50 +const DEFAULT_BASE_DIR = 'data/trading' + +export interface SnapshotStoreOptions { + baseDir?: string +} + +export interface SnapshotStore { + append(snapshot: UTASnapshot): Promise + readRange(opts?: { startTime?: string; endTime?: string; limit?: number }): Promise +} + +export function createSnapshotStore(accountId: string, options?: SnapshotStoreOptions): SnapshotStore { + const dir = resolve(options?.baseDir ?? DEFAULT_BASE_DIR, accountId, 'snapshots') + const indexPath = resolve(dir, 'index.json') + + // Serialize writes to prevent concurrent append from corrupting the index + let writeChain = Promise.resolve() + + async function readIndex(): Promise { + try { + const raw = await readFile(indexPath, 'utf-8') + return JSON.parse(raw) as SnapshotIndex + } catch { + return { version: 1, chunks: [] } + } + } + + async function saveIndex(index: SnapshotIndex): Promise { + await mkdir(dir, { recursive: true }) + const tmp = `${indexPath}.${process.pid}.tmp` + await writeFile(tmp, JSON.stringify(index, null, 2), 'utf-8') + await rename(tmp, indexPath) + } + + function chunkName(n: number): string { + return `chunk-${String(n).padStart(4, '0')}.jsonl` + } + + async function doAppend(snapshot: UTASnapshot): Promise { + const index = await readIndex() + const last = index.chunks[index.chunks.length - 1] + + let chunkFile: string + if (!last || last.count >= CHUNK_SIZE) { + const nextNum = index.chunks.length + 1 + chunkFile = chunkName(nextNum) + index.chunks.push({ + file: chunkFile, + count: 1, + startTime: snapshot.timestamp, + endTime: snapshot.timestamp, + }) + } else { + chunkFile = last.file + last.count += 1 + last.endTime = snapshot.timestamp + } + + await mkdir(dir, { recursive: true }) + await appendFile(resolve(dir, chunkFile), JSON.stringify(snapshot) + '\n', 'utf-8') + await saveIndex(index) + } + + return { + append(snapshot) { + const p = writeChain.then(() => doAppend(snapshot)) + // Always advance chain even on error, so next write isn't blocked + writeChain = p.catch(() => {}) + return p + }, + + async readRange(opts) { + const index = await readIndex() + const { startTime, endTime, limit } = opts ?? {} + const results: UTASnapshot[] = [] + + // Walk chunks in reverse (newest first) + for (let i = index.chunks.length - 1; i >= 0; i--) { + const chunk = index.chunks[i] + + // Skip chunks outside time range + if (startTime && chunk.endTime < startTime) continue + if (endTime && chunk.startTime > endTime) continue + + const raw = await readFile(resolve(dir, chunk.file), 'utf-8') + const lines = raw.trim().split('\n').filter(Boolean) + + // Parse in reverse (newest first within chunk) + for (let j = lines.length - 1; j >= 0; j--) { + const snap = JSON.parse(lines[j]) as UTASnapshot + if (startTime && snap.timestamp < startTime) continue + if (endTime && snap.timestamp > endTime) continue + results.push(snap) + if (limit && results.length >= limit) return results + } + } + + return results + }, + } +} diff --git a/src/domain/trading/snapshot/types.ts b/src/domain/trading/snapshot/types.ts new file mode 100644 index 00000000..1d0affcd --- /dev/null +++ b/src/domain/trading/snapshot/types.ts @@ -0,0 +1,68 @@ +/** + * UTA Snapshot types. + * + * Captures the full state of a UnifiedTradingAccount at a point in time, + * independently of trading operations (TradingGit commits). + * + * All financial values are stored as strings to avoid floating-point precision loss. + */ + +// ==================== Snapshot ==================== + +export type SnapshotTrigger = 'scheduled' | 'post-push' | 'post-reject' | 'manual' + +export interface UTASnapshot { + accountId: string + timestamp: string + trigger: SnapshotTrigger + + account: { + netLiquidation: string + totalCashValue: string + unrealizedPnL: string + realizedPnL: string + buyingPower?: string + initMarginReq?: string + maintMarginReq?: string + } + + positions: Array<{ + aliceId: string + side: 'long' | 'short' + quantity: string + avgCost: string + marketPrice: string + marketValue: string + unrealizedPnL: string + realizedPnL: string + }> + + openOrders: Array<{ + orderId: string + aliceId: string + action: string + orderType: string + totalQuantity: string + limitPrice?: string + status: string + avgFillPrice?: string + }> + + health: 'healthy' | 'degraded' | 'offline' | 'disabled' + headCommit: string | null + pendingCommits: string[] +} + +// ==================== Storage ==================== + +export interface SnapshotChunkEntry { + file: string + count: number + startTime: string + endTime: string +} + +export interface SnapshotIndex { + version: 1 + chunks: SnapshotChunkEntry[] +} diff --git a/src/main.ts b/src/main.ts index d7389228..03192a5e 100644 --- a/src/main.ts +++ b/src/main.ts @@ -8,7 +8,7 @@ import { TelegramPlugin } from './connectors/telegram/index.js' import { WebPlugin } from './connectors/web/index.js' import { McpAskPlugin } from './connectors/mcp-ask/index.js' import { createThinkingTools } from './tool/thinking.js' -import { AccountManager } from './domain/trading/index.js' +import { AccountManager, createSnapshotService, createSnapshotScheduler } from './domain/trading/index.js' import { createTradingTools } from './tool/trading.js' import { Brain } from './domain/brain/index.js' import { createBrainTools } from './tool/brain.js' @@ -84,6 +84,14 @@ async function main() { } accountManager.registerCcxtToolsIfNeeded() + // ==================== Snapshot ==================== + + const snapshotService = createSnapshotService({ accountManager, eventLog }) + accountManager.setSnapshotHooks({ + onPostPush: (id) => { snapshotService.takeSnapshot(id, 'post-push') }, + onPostReject: (id) => { snapshotService.takeSnapshot(id, 'post-reject') }, + }) + // ==================== Brain ==================== const [brainExport, persona] = await Promise.all([ @@ -218,6 +226,14 @@ async function main() { cronListener.start() console.log('cron: engine + listener started') + // ==================== Snapshot Scheduler ==================== + + const snapshotScheduler = createSnapshotScheduler({ snapshotService, cronEngine, eventLog, config: config.snapshot }) + await snapshotScheduler.start() + if (config.snapshot.enabled) { + console.log(`snapshot: scheduler started (every ${config.snapshot.every})`) + } + // ==================== Heartbeat ==================== const heartbeat = createHeartbeat({ @@ -357,7 +373,7 @@ async function main() { const ctx: EngineContext = { config, connectorCenter, agentCenter, eventLog, toolCallLog, heartbeat, cronEngine, toolCenter, - accountManager, + accountManager, snapshotService, reconnectConnectors, } @@ -374,6 +390,7 @@ async function main() { const shutdown = async () => { stopped = true newsCollector?.stop() + snapshotScheduler.stop() heartbeat.stop() cronListener.stop() cronEngine.stop() diff --git a/src/task/cron/listener.ts b/src/task/cron/listener.ts index 61c2b6cf..1eaad80e 100644 --- a/src/task/cron/listener.ts +++ b/src/task/cron/listener.ts @@ -16,7 +16,10 @@ import type { AgentCenter } from '../../core/agent-center.js' import { SessionStore } from '../../core/session.js' import type { ConnectorCenter } from '../../core/connector-center.js' import type { CronFirePayload } from './engine.js' -import { HEARTBEAT_JOB_NAME } from '../heartbeat/heartbeat.js' +/** Internal jobs (prefixed with __) have dedicated handlers and should not be routed to the AI. */ +function isInternalJob(name: string): boolean { + return name.startsWith('__') && name.endsWith('__') +} // ==================== Types ==================== @@ -45,8 +48,8 @@ export function createCronListener(opts: CronListenerOpts): CronListener { async function handleFire(entry: EventLogEntry): Promise { const payload = entry.payload as CronFirePayload - // Guard: heartbeat events are handled by the heartbeat listener - if (payload.jobName === HEARTBEAT_JOB_NAME) return + // Guard: internal jobs (__heartbeat__, __snapshot__, etc.) have dedicated handlers + if (isInternalJob(payload.jobName)) return // Guard: skip if already processing (serial execution) if (processing) { diff --git a/ui/package.json b/ui/package.json index d6ced5c6..ab04de7e 100644 --- a/ui/package.json +++ b/ui/package.json @@ -14,7 +14,8 @@ "marked-highlight": "^2.2.1", "react": "^19.1.0", "react-dom": "^19.1.0", - "react-router-dom": "^7.13.1" + "react-router-dom": "^7.13.1", + "recharts": "^3.8.0" }, "devDependencies": { "@tailwindcss/vite": "^4.1.8", diff --git a/ui/src/api/index.ts b/ui/src/api/index.ts index f71b3ce3..84fcdbad 100644 --- a/ui/src/api/index.ts +++ b/ui/src/api/index.ts @@ -52,6 +52,8 @@ export type { NewsCollectorFeed, ToolCallRecord, LoginMethod, + UTASnapshotSummary, + EquityCurvePoint, } from './types' export type { EventQueryResult } from './events' export type { ToolCallQueryResult } from './agentStatus' diff --git a/ui/src/api/trading.ts b/ui/src/api/trading.ts index a6e35e89..059e4f56 100644 --- a/ui/src/api/trading.ts +++ b/ui/src/api/trading.ts @@ -1,5 +1,5 @@ import { fetchJson } from './client' -import type { TradingAccount, AccountSummary, AccountInfo, Position, WalletCommitLog, ReconnectResult, AccountConfig, WalletStatus, WalletPushResult, WalletRejectResult, TestConnectionResult, BrokerTypeInfo } from './types' +import type { TradingAccount, AccountSummary, AccountInfo, Position, WalletCommitLog, ReconnectResult, AccountConfig, WalletStatus, WalletPushResult, WalletRejectResult, TestConnectionResult, BrokerTypeInfo, UTASnapshotSummary, EquityCurvePoint } from './types' // ==================== Unified Trading API ==================== @@ -112,6 +112,26 @@ export const tradingApi = { } }, + // ==================== Snapshots ==================== + + async snapshots(accountId: string, opts?: { limit?: number; startTime?: string; endTime?: string }): Promise<{ snapshots: UTASnapshotSummary[] }> { + const params = new URLSearchParams() + if (opts?.limit) params.set('limit', String(opts.limit)) + if (opts?.startTime) params.set('startTime', opts.startTime) + if (opts?.endTime) params.set('endTime', opts.endTime) + return fetchJson(`/api/trading/accounts/${accountId}/snapshots?${params}`) + }, + + async equityCurve(opts?: { startTime?: string; endTime?: string; limit?: number }): Promise<{ points: EquityCurvePoint[] }> { + const params = new URLSearchParams() + if (opts?.limit) params.set('limit', String(opts.limit)) + if (opts?.startTime) params.set('startTime', opts.startTime) + if (opts?.endTime) params.set('endTime', opts.endTime) + return fetchJson(`/api/trading/snapshots/equity-curve?${params}`) + }, + + // ==================== Connection Testing ==================== + async testConnection(account: AccountConfig): Promise { const res = await fetch('/api/trading/config/test-connection', { method: 'POST', diff --git a/ui/src/api/types.ts b/ui/src/api/types.ts index 2a7d5f49..7d9ab8db 100644 --- a/ui/src/api/types.ts +++ b/ui/src/api/types.ts @@ -79,6 +79,10 @@ export interface AppConfig { prompt: string activeHours: { start: string; end: string; timezone: string } | null } + snapshot: { + enabled: boolean + every: string + } connectors: ConnectorsConfig [key: string]: unknown } @@ -317,3 +321,45 @@ export interface TestConnectionResult { error?: string account?: unknown } + +// ==================== Snapshots ==================== + +export interface UTASnapshotSummary { + accountId: string + timestamp: string + trigger: string + account: { + netLiquidation: string + totalCashValue: string + unrealizedPnL: string + realizedPnL: string + buyingPower?: string + initMarginReq?: string + maintMarginReq?: string + } + positions: Array<{ + aliceId: string + side: 'long' | 'short' + quantity: string + avgCost: string + marketPrice: string + marketValue: string + unrealizedPnL: string + realizedPnL: string + }> + openOrders: Array<{ + orderId: string + aliceId: string + action: string + orderType: string + totalQuantity: string + status: string + }> + health: string +} + +export interface EquityCurvePoint { + timestamp: string + equity: string + accounts: Record +} diff --git a/ui/src/components/EquityCurve.tsx b/ui/src/components/EquityCurve.tsx new file mode 100644 index 00000000..97f7dac9 --- /dev/null +++ b/ui/src/components/EquityCurve.tsx @@ -0,0 +1,215 @@ +import { useState, useMemo } from 'react' +import { + AreaChart, Area, XAxis, YAxis, Tooltip, ResponsiveContainer, ReferenceLine, +} from 'recharts' +import type { EquityCurvePoint } from '../api' + +// ==================== Time ranges ==================== + +const RANGES = [ + { label: '1H', ms: 60 * 60 * 1000 }, + { label: '6H', ms: 6 * 60 * 60 * 1000 }, + { label: '24H', ms: 24 * 60 * 60 * 1000 }, + { label: '7D', ms: 7 * 24 * 60 * 60 * 1000 }, + { label: '30D', ms: 30 * 24 * 60 * 60 * 1000 }, + { label: 'All', ms: 0 }, +] as const + +// ==================== Props ==================== + +interface EquityCurveProps { + points: EquityCurvePoint[] + accounts: Array<{ id: string; label: string }> + selectedAccountId: string | 'all' + onAccountChange: (id: string | 'all') => void + onPointClick?: (point: EquityCurvePoint) => void + selectedTimestamp?: string | null +} + +// ==================== Component ==================== + +export function EquityCurve({ + points, accounts, selectedAccountId, onAccountChange, + onPointClick, selectedTimestamp, +}: EquityCurveProps) { + const [range, setRange] = useState('24H') + + const filtered = useMemo(() => { + const r = RANGES.find(r => r.label === range) + if (!r || r.ms === 0) return points + const cutoff = Date.now() - r.ms + return points.filter(p => new Date(p.timestamp).getTime() >= cutoff) + }, [points, range]) + + // Convert to chart data + const chartData = useMemo(() => + filtered.map(p => ({ + ...p, + time: new Date(p.timestamp).getTime(), + equityNum: Number(p.equity), + })), + [filtered]) + + if (chartData.length === 0) return null + + const isAllView = selectedAccountId === 'all' + + return ( +
+ {/* Header */} +
+

+ Equity Curve +

+
+ {RANGES.map(r => ( + + ))} +
+
+ + {/* Account switcher */} + {accounts.length > 1 && ( +
+ {accounts.map(a => ( + + ))} + +
+ )} + + {/* Chart */} + + { + if (e?.activePayload?.[0]?.payload && onPointClick) { + onPointClick(e.activePayload[0].payload as EquityCurvePoint) + } + }} + > + + + + + + + + + } /> + + {selectedTimestamp && ( + + )} + + +
+ ) +} + +// ==================== Custom Tooltip ==================== + +function CustomTooltip({ active, payload, isAllView, accounts }: any) { + if (!active || !payload?.[0]) return null + const data = payload[0].payload as EquityCurvePoint & { time: number } + const accountMap = new Map((accounts as Array<{ id: string; label: string }>).map(a => [a.id, a.label])) + + return ( +
+

+ {new Date(data.time).toLocaleString()} +

+

+ ${Number(data.equity).toLocaleString('en-US', { minimumFractionDigits: 2, maximumFractionDigits: 2 })} +

+ {isAllView && data.accounts && Object.keys(data.accounts).length > 1 && ( +
+ {Object.entries(data.accounts).map(([id, val]) => ( +
+ {accountMap.get(id) ?? id} + + ${Number(val).toLocaleString('en-US', { minimumFractionDigits: 2, maximumFractionDigits: 2 })} + +
+ ))} +
+ )} +
+ ) +} + +// ==================== Formatters ==================== + +function formatTime(ts: number): string { + const d = new Date(ts) + const now = new Date() + if (d.toDateString() === now.toDateString()) { + return d.toLocaleTimeString('en-US', { hour: '2-digit', minute: '2-digit' }) + } + return d.toLocaleDateString('en-US', { month: 'short', day: 'numeric' }) + + ' ' + d.toLocaleTimeString('en-US', { hour: '2-digit', minute: '2-digit' }) +} + +function formatCurrency(val: number): string { + if (val >= 1_000_000) return `$${(val / 1_000_000).toFixed(1)}M` + if (val >= 1_000) return `$${(val / 1_000).toFixed(1)}K` + return `$${val.toFixed(0)}` +} diff --git a/ui/src/components/SnapshotDetail.tsx b/ui/src/components/SnapshotDetail.tsx new file mode 100644 index 00000000..290e2014 --- /dev/null +++ b/ui/src/components/SnapshotDetail.tsx @@ -0,0 +1,167 @@ +import type { UTASnapshotSummary } from '../api' + +// ==================== Props ==================== + +interface SnapshotDetailProps { + snapshot: UTASnapshotSummary + onClose: () => void +} + +// ==================== Component ==================== + +export function SnapshotDetail({ snapshot, onClose }: SnapshotDetailProps) { + const a = snapshot.account + + return ( +
+ {/* Header */} +
+
+ + + {new Date(snapshot.timestamp).toLocaleString()} + + + {snapshot.accountId} +
+ +
+ + {/* Account Summary */} +
+ + + + +
+ + {/* Positions */} + {snapshot.positions.length > 0 && ( +
+

+ Positions ({snapshot.positions.length}) +

+
+ + + + + + + + + + + + + {snapshot.positions.map((p, i) => { + const pnl = Number(p.unrealizedPnL) + return ( + + + + + + + + + ) + })} + +
SymbolQtyAvg CostMkt PriceMkt ValuePnL
+ {symbolFromAliceId(p.aliceId)} + + {p.side} + + {p.quantity}{fmtStr(p.avgCost)}{fmtStr(p.marketPrice)}{fmtStr(p.marketValue)}= 0 ? 'text-green' : 'text-red'}`}> + {fmtPnlStr(p.unrealizedPnL)} +
+
+
+ )} + + {/* Open Orders */} + {snapshot.openOrders.length > 0 && ( +
+

+ Open Orders ({snapshot.openOrders.length}) +

+
+ {snapshot.openOrders.map((o, i) => ( +
+ {o.action} + {symbolFromAliceId(o.aliceId)} + {o.totalQuantity} @ {o.orderType} + {o.status} +
+ ))} +
+
+ )} + + {/* Empty state */} + {snapshot.positions.length === 0 && snapshot.openOrders.length === 0 && ( +
+

No positions or orders at this time.

+
+ )} +
+ ) +} + +// ==================== Sub-components ==================== + +function HealthDot({ health }: { health: string }) { + const color = health === 'healthy' ? 'bg-green' + : health === 'degraded' ? 'bg-yellow-400' + : health === 'disabled' ? 'bg-text-muted/40' + : 'bg-red' + return
+} + +function TriggerBadge({ trigger }: { trigger: string }) { + const label = trigger === 'post-push' ? 'push' + : trigger === 'post-reject' ? 'reject' + : trigger + return ( + + {label} + + ) +} + +function MetricItem({ label, value, pnl }: { label: string; value: string; pnl?: number }) { + const color = pnl == null ? 'text-text' : pnl >= 0 ? 'text-green' : 'text-red' + return ( +
+

{label}

+

{value}

+
+ ) +} + +// ==================== Helpers ==================== + +/** Extract symbol from aliceId like "mock-paper|AAPL" → "AAPL" */ +function symbolFromAliceId(aliceId: string): string { + const parts = aliceId.split('|') + return parts[parts.length - 1] +} + +function fmtStr(s: string): string { + const n = Number(s) + if (isNaN(n)) return s + return `$${n.toLocaleString('en-US', { minimumFractionDigits: 2, maximumFractionDigits: 2 })}` +} + +function fmtPnlStr(s: string): string { + const n = Number(s) + if (isNaN(n)) return s + const sign = n >= 0 ? '+' : '' + return `${sign}$${n.toLocaleString('en-US', { minimumFractionDigits: 2, maximumFractionDigits: 2 })}` +} diff --git a/ui/src/pages/PortfolioPage.tsx b/ui/src/pages/PortfolioPage.tsx index 63189aa3..847aa1a3 100644 --- a/ui/src/pages/PortfolioPage.tsx +++ b/ui/src/pages/PortfolioPage.tsx @@ -1,8 +1,12 @@ -import { useState, useEffect, useCallback } from 'react' -import { api, type Position, type WalletCommitLog } from '../api' +import { useState, useEffect, useCallback, useMemo } from 'react' +import { api, type Position, type WalletCommitLog, type EquityCurvePoint, type UTASnapshotSummary } from '../api' +import { useAutoSave } from '../hooks/useAutoSave' import { useAccountHealth } from '../hooks/useAccountHealth' import { PageHeader } from '../components/PageHeader' import { EmptyState } from '../components/StateViews' +import { EquityCurve } from '../components/EquityCurve' +import { SnapshotDetail } from '../components/SnapshotDetail' +import { Toggle } from '../components/Toggle' // ==================== Types ==================== @@ -37,14 +41,57 @@ export function PortfolioPage() { const [data, setData] = useState(EMPTY) const [loading, setLoading] = useState(true) const [lastRefresh, setLastRefresh] = useState(null) + const [curvePoints, setCurvePoints] = useState([]) + const [curveAccountId, setCurveAccountId] = useState('') // '' = not yet initialized + const [selectedTimestamp, setSelectedTimestamp] = useState(null) + const [selectedSnapshot, setSelectedSnapshot] = useState(null) + const [snapshotEnabled, setSnapshotEnabled] = useState(true) + const [snapshotEvery, setSnapshotEvery] = useState('15m') + + const snapshotConfig = useMemo(() => ({ enabled: snapshotEnabled, every: snapshotEvery }), [snapshotEnabled, snapshotEvery]) + const saveSnapshotConfig = useCallback(async (d: Record) => { + await api.config.updateSection('snapshot', d) + }, []) + const { status: snapshotSaveStatus } = useAutoSave({ data: snapshotConfig, save: saveSnapshotConfig }) + + // Fetch curve data for a specific account or all + const fetchCurveData = useCallback(async (accountId: string | 'all') => { + if (accountId === 'all') { + const result = await api.trading.equityCurve({ limit: 200 }).catch(() => ({ points: [] })) + return result.points + } + // Single account — fetch its snapshots and convert to EquityCurvePoint format + const { snapshots } = await api.trading.snapshots(accountId, { limit: 200 }).catch(() => ({ snapshots: [] as UTASnapshotSummary[] })) + return snapshots + .sort((a, b) => a.timestamp.localeCompare(b.timestamp)) + .map(s => ({ + timestamp: s.timestamp, + equity: s.account.netLiquidation, + accounts: { [accountId]: s.account.netLiquidation }, + })) + }, []) const refresh = useCallback(async () => { setLoading(true) - const result = await fetchPortfolioData() + const [result, configResult] = await Promise.all([ + fetchPortfolioData(), + api.config.load().catch(() => null), + ]) setData(result) + if (configResult?.snapshot) { + setSnapshotEnabled(configResult.snapshot.enabled) + setSnapshotEvery(configResult.snapshot.every) + } + + // Default to first account on initial load + const effectiveId = curveAccountId || result.accounts[0]?.id || 'all' + if (!curveAccountId && effectiveId) setCurveAccountId(effectiveId) + const points = await fetchCurveData(effectiveId) + setCurvePoints(points) + setLastRefresh(new Date()) setLoading(false) - }, []) + }, [curveAccountId, fetchCurveData]) useEffect(() => { refresh() }, [refresh]) @@ -61,6 +108,29 @@ export function PortfolioPage() { a.walletLog.map(c => ({ ...c, accountLabel: a.label, accountProvider: a.provider })), ) + // Account list for the chart switcher + const chartAccounts = data.accounts.map(a => ({ id: a.id, label: a.label })) + + const handleAccountChange = useCallback(async (id: string | 'all') => { + setCurveAccountId(id) + setSelectedSnapshot(null) + setSelectedTimestamp(null) + const points = await fetchCurveData(id) + setCurvePoints(points) + }, [fetchCurveData]) + + const handlePointClick = useCallback(async (point: EquityCurvePoint) => { + setSelectedTimestamp(point.timestamp) + const accountId = curveAccountId !== 'all' ? curveAccountId : Object.keys(point.accounts)[0] + if (!accountId) return + try { + const { snapshots } = await api.trading.snapshots(accountId, { limit: 1 }) + if (snapshots.length > 0) setSelectedSnapshot(snapshots[0]) + } catch { + // Ignore — snapshot fetch failed + } + }, [curveAccountId]) + // Merge equity per-account data with provider info + per-account unrealizedPnL from positions const accountSources = (data.equity?.accounts ?? []).map(eq => { const acct = data.accounts.find(a => a.id === eq.id) @@ -90,6 +160,32 @@ export function PortfolioPage() {
+ {curvePoints.length > 0 && ( + + )} + + + + {selectedSnapshot && ( + { setSelectedSnapshot(null); setSelectedTimestamp(null) }} + /> + )} + {accountSources.length > 0 && ( )} @@ -388,6 +484,71 @@ function TradeLog({ commits }: { commits: CommitWithAccount[] }) { // ==================== Formatting Helpers ==================== +// ==================== Snapshot Settings ==================== + +const INTERVAL_PRESETS = [ + { label: '1m', value: '1m' }, + { label: '5m', value: '5m' }, + { label: '15m', value: '15m' }, + { label: '30m', value: '30m' }, + { label: '1h', value: '1h' }, +] + +function SnapshotSettings({ enabled, every, onEnabledChange, onEveryChange, saveStatus }: { + enabled: boolean + every: string + onEnabledChange: (v: boolean) => void + onEveryChange: (v: string) => void + saveStatus: string +}) { + const isPreset = INTERVAL_PRESETS.some(p => p.value === every) + const [showCustom, setShowCustom] = useState(!isPreset) + + return ( +
+ Snapshots + +
+ {INTERVAL_PRESETS.map(p => ( + + ))} + +
+ {showCustom && ( + onEveryChange(e.target.value)} + placeholder="e.g. 2h" + /> + )} + {saveStatus === 'saving' && saving...} + {saveStatus === 'error' && save failed} +
+ ) +} + +// ==================== Formatting Helpers ==================== + function fmt(n: number): string { return n >= 1000 ? `$${n.toLocaleString('en-US', { minimumFractionDigits: 2, maximumFractionDigits: 2 })}` : `$${n.toFixed(2)}`