Skip to content

Commit ff7decc

Browse files
committed
feat: AsyncIterable streaming support for library SDK
Streaming commands (log list --follow, dashboard view --refresh) now return AsyncIterable instead of throwing. Consumer break and AbortSignal both wire through to command abort for clean shutdown. Closes #585
1 parent 154d87b commit ff7decc

File tree

13 files changed

+709
-87
lines changed

13 files changed

+709
-87
lines changed

AGENTS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,9 @@ mock.module("./some-module", () => ({
893893
894894
### Architecture
895895
896+
<!-- lore:019d2d10-671c-77d8-9dbc-c32d1604dcf7 -->
897+
* **AsyncIterable streaming for SDK blocked by four structural concerns**: Full \`AsyncIterable\` streaming for SDK (GitHub #585) has four blockers: (1) \`setEnv\` is global mutable state — \`finally\` cleanup can't run until consumer finishes iterating; concurrent streams corrupt each other. (2) \`withTelemetry\` uses callback pattern — streaming needs handle-based spans with deferred \`span.end()\`. (3) Streaming commands use \`process.once('SIGINT')\` for teardown — library mode needs AbortController wired to AsyncGenerator \`return()\`. (4) Codegen needs function overloads for streaming-capable commands (\`log list --follow\`, \`dashboard view --refresh\`). Only two commands currently support streaming. Estimated ~500 lines across ~12 files.
898+
896899
<!-- lore:019d2690-4df2-7ac8-82c4-54656d987339 -->
897900
* **Bundle uses esbuild with bun:sqlite polyfill plugin for Node.js compatibility**: \`script/bundle.ts\` uses esbuild to produce \`dist/index.cjs\` from \`src/index.ts\`. A \`bunSqlitePlugin\` replaces \`bun:sqlite\` imports with a polyfill. Build defines \`SENTRY\_CLI\_VERSION\` and \`SENTRY\_CLIENT\_ID\_BUILD\`, externalizes \`node:\*\` builtins. \`sentrySourcemapPlugin\` handles debug ID injection and sourcemap upload. After the main build, writes: (1) \`dist/bin.cjs\` — CLI wrapper with shebang/Node version check/warning suppression, (2) \`dist/index.d.cts\` — type declarations read from pre-built \`src/sdk.generated.d.cts\`. Both \`sdk.generated.\*\` files are gitignored and regenerated via \`generate:sdk\` script chained before \`bundle\` in \`package.json\`. Debug IDs solve sourcemap deduplication between npm bundle and bun compile builds.
898901

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ Options (all optional):
112112
- `project` — Default project slug.
113113
- `text` — Return human-readable string instead of parsed JSON (affects `run()` only).
114114
- `cwd` — Working directory for DSN auto-detection. Defaults to `process.cwd()`.
115+
- `signal``AbortSignal` to cancel streaming commands (`--follow`, `--refresh`).
116+
117+
Streaming commands return `AsyncIterable` — use `for await...of` and `break` to stop.
115118

116119
Errors are thrown as `SentryError` with `.exitCode` and `.stderr`.
117120

docs/src/content/docs/library-usage.md

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ const sdk = createSentrySDK({ token: "...", text: true, cwd: "/my/project" });
151151
| `project` | `string` | Auto-detected | Default project slug |
152152
| `text` | `boolean` | `false` | Return human-readable text instead of parsed JSON (`run()` only) |
153153
| `cwd` | `string` | `process.cwd()` | Working directory for DSN auto-detection |
154+
| `signal` | `AbortSignal` || Abort signal for cancelling streaming commands |
154155

155156
## Return Values
156157

@@ -217,7 +218,49 @@ Calls should be sequential (awaited one at a time).
217218
- **Node.js >= 22** (required for `node:sqlite`)
218219
- Or **Bun** (any recent version)
219220

220-
:::caution
221-
Streaming flags (`--refresh`, `--follow`) are not supported in library mode
222-
and will throw a `SentryError`. Use the CLI binary directly for live-streaming commands.
221+
## Streaming Commands
222+
223+
Two commands support real-time streaming: `log list --follow` and `dashboard view --refresh`.
224+
When using streaming flags, methods return an `AsyncIterable` instead of a `Promise`:
225+
226+
```typescript
227+
const sdk = createSentrySDK({ token: "sntrys_..." });
228+
229+
// Stream logs as they arrive (polls every 5 seconds)
230+
for await (const log of sdk.log.list({ follow: "5", orgProject: "acme/backend" })) {
231+
console.log(log);
232+
}
233+
234+
// Auto-refresh dashboard (polls every 30 seconds)
235+
for await (const snapshot of sdk.run("dashboard", "view", "123", "--refresh", "30")) {
236+
console.log(snapshot);
237+
}
238+
239+
// Stop streaming by breaking out of the loop
240+
for await (const log of sdk.log.list({ follow: "2" })) {
241+
if (someCondition) break; // Streaming stops immediately
242+
}
243+
```
244+
245+
### Cancellation
246+
247+
`break` in a `for await...of` loop immediately signals the streaming command to stop.
248+
You can also pass an `AbortSignal` via `SentryOptions` for programmatic cancellation:
249+
250+
```typescript
251+
const controller = new AbortController();
252+
const sdk = createSentrySDK({ token: "...", signal: controller.signal });
253+
254+
// Cancel after 30 seconds
255+
setTimeout(() => controller.abort(), 30_000);
256+
257+
for await (const log of sdk.log.list({ follow: "5" })) {
258+
console.log(log);
259+
}
260+
// Loop exits when signal fires
261+
```
262+
263+
:::note
264+
Concurrent streaming calls are not supported. Each streaming invocation
265+
uses an isolated environment — only one can be active at a time.
223266
:::

script/bundle.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ const CORE_DECLARATIONS = `export type SentryOptions = {
226226
text?: boolean;
227227
/** Working directory (affects DSN detection, project root). Defaults to process.cwd(). */
228228
cwd?: string;
229+
/** AbortSignal to cancel streaming commands (e.g. log list --follow). */
230+
signal?: AbortSignal;
231+
};
232+
233+
export type AsyncChannel<T> = AsyncIterable<T> & {
234+
push(value: T): void;
235+
close(): void;
236+
error(err: Error): void;
229237
};
230238
231239
export declare class SentryError extends Error {

script/generate-sdk.ts

Lines changed: 134 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ const INTERNAL_FLAGS = new Set([
3434
"log-level",
3535
"verbose",
3636
"fields",
37-
// Streaming flags produce infinite output — not supported in library mode
38-
"refresh",
39-
"follow",
4037
]);
4138

39+
/** Flags that trigger streaming mode — included in params but change return type */
40+
const STREAMING_FLAGS = new Set(["refresh", "follow"]);
41+
4242
/** Regex for stripping angle-bracket/ellipsis decorators from placeholder names */
4343
const PLACEHOLDER_CLEAN_RE = /[<>.]/g;
4444

@@ -341,13 +341,12 @@ function generateParamsInterface(
341341
return { name: interfaceName, code };
342342
}
343343

344-
/** Generate the method body (invoke call) for a command. */
345-
function generateMethodBody(
344+
/** Build the flag object expression and positional expression for an invoke call. */
345+
function buildInvokeArgs(
346346
path: string[],
347347
positional: PositionalInfo,
348-
flags: SdkFlagInfo[],
349-
returnType: string
350-
): string {
348+
flags: SdkFlagInfo[]
349+
): { flagObj: string; positionalExpr: string; pathStr: string } {
351350
const flagEntries = flags.map((f) => {
352351
const camel = camelCase(f.name);
353352
if (f.name !== camel) {
@@ -368,8 +367,65 @@ function generateMethodBody(
368367

369368
const flagObj =
370369
flagEntries.length > 0 ? `{ ${flagEntries.join(", ")} }` : "{}";
370+
const pathStr = JSON.stringify(path);
371+
372+
return { flagObj, positionalExpr, pathStr };
373+
}
374+
375+
/**
376+
* Generate the method body (invoke call) for a non-streaming command.
377+
* Uses `as Promise<T>` to narrow the invoke union return type, since
378+
* non-streaming calls never pass `meta.streaming` and always return a Promise.
379+
*/
380+
function generateMethodBody(
381+
path: string[],
382+
positional: PositionalInfo,
383+
flags: SdkFlagInfo[],
384+
returnType: string
385+
): string {
386+
const { flagObj, positionalExpr, pathStr } = buildInvokeArgs(
387+
path,
388+
positional,
389+
flags
390+
);
391+
return `invoke<${returnType}>(${pathStr}, ${flagObj}, ${positionalExpr}) as Promise<${returnType}>`;
392+
}
393+
394+
/** Options for generating a streaming method body. */
395+
type StreamingMethodOpts = {
396+
path: string[];
397+
positional: PositionalInfo;
398+
flags: SdkFlagInfo[];
399+
returnType: string;
400+
streamingFlagNames: string[];
401+
indent: string;
402+
};
371403

372-
return `invoke<${returnType}>(${JSON.stringify(path)}, ${flagObj}, ${positionalExpr})`;
404+
/**
405+
* Generate the method body for a streaming-capable command.
406+
*
407+
* Detects at runtime whether any streaming flag is present and passes
408+
* `{ streaming: true }` to the invoker when it is.
409+
*/
410+
function generateStreamingMethodBody(opts: StreamingMethodOpts): string {
411+
const { flagObj, positionalExpr, pathStr } = buildInvokeArgs(
412+
opts.path,
413+
opts.positional,
414+
opts.flags
415+
);
416+
417+
// Build the streaming condition: params?.follow !== undefined || params?.refresh !== undefined
418+
const conditions = opts.streamingFlagNames
419+
.map((name) => `params?.${camelCase(name)} !== undefined`)
420+
.join(" || ");
421+
422+
const lines = [
423+
"{",
424+
`${opts.indent} const streaming = ${conditions};`,
425+
`${opts.indent} return invoke<${opts.returnType}>(${pathStr}, ${flagObj}, ${positionalExpr}, { streaming });`,
426+
`${opts.indent} }`,
427+
];
428+
return lines.join("\n");
373429
}
374430

375431
// ---------------------------------------------------------------------------
@@ -472,6 +528,12 @@ for (const { path, command } of allCommands) {
472528
const flags = extractSdkFlags(command);
473529
const positional = derivePositional(command);
474530

531+
// Detect streaming-capable commands
532+
const streamingFlagNames = flags
533+
.filter((f) => STREAMING_FLAGS.has(f.name))
534+
.map((f) => f.name);
535+
const isStreaming = streamingFlagNames.length > 0;
536+
475537
// Generate return type from schema
476538
const schemaTypeName = `${buildTypeName(path)}Result`;
477539
const returnTypeInfo = generateReturnType(command, schemaTypeName);
@@ -493,40 +555,87 @@ for (const { path, command } of allCommands) {
493555
let paramsArg: string;
494556
let body: string;
495557

558+
const brief = command.brief || path.join(" ");
559+
const methodName = path.at(-1) ?? path[0];
560+
const indent = " ".repeat(path.length - 1);
561+
496562
if (hasVariadicPositional) {
497563
// Variadic: (params: XParams, ...positional: string[]) or (params?: XParams, ...positional: string[])
498564
// Required flags make params required even with variadic positionals
499565
const paramsOpt = hasRequiredFlags ? "" : "?";
500566
paramsArg = params
501567
? `params${paramsOpt}: ${params.name}, ...positional: string[]`
502568
: "...positional: string[]";
503-
body = generateMethodBody(path, positional, flags, returnType);
569+
body = isStreaming
570+
? generateStreamingMethodBody({
571+
path,
572+
positional,
573+
flags,
574+
returnType,
575+
streamingFlagNames,
576+
indent,
577+
})
578+
: generateMethodBody(path, positional, flags, returnType);
504579
} else if (params) {
505580
const paramsRequired = hasRequiredFlags;
506581
paramsArg = paramsRequired
507582
? `params: ${params.name}`
508583
: `params?: ${params.name}`;
509-
body = generateMethodBody(path, positional, flags, returnType);
584+
body = isStreaming
585+
? generateStreamingMethodBody({
586+
path,
587+
positional,
588+
flags,
589+
returnType,
590+
streamingFlagNames,
591+
indent,
592+
})
593+
: generateMethodBody(path, positional, flags, returnType);
510594
} else {
511595
body = generateMethodBody(path, positional, flags, returnType);
512596
paramsArg = "";
513597
}
514598

515-
const brief = command.brief || path.join(" ");
516-
const methodName = path.at(-1) ?? path[0];
517-
const indent = " ".repeat(path.length - 1);
518599
const sig = paramsArg ? `(${paramsArg})` : "()";
519-
const methodCode = [
520-
`${indent} /** ${brief} */`,
521-
`${indent} ${methodName}: ${sig}: Promise<${returnType}> =>`,
522-
`${indent} ${body},`,
523-
].join("\n");
524-
525-
// Type declaration: method signature without implementation
526-
const typeDecl = [
527-
`${indent} /** ${brief} */`,
528-
`${indent} ${methodName}${sig}: Promise<${returnType}>;`,
529-
].join("\n");
600+
601+
let methodCode: string;
602+
let typeDecl: string;
603+
604+
if (isStreaming) {
605+
// Streaming commands use a function body (not arrow expression)
606+
// because they need runtime streaming detection
607+
methodCode = [
608+
`${indent} /** ${brief} */`,
609+
`${indent} ${methodName}: ${sig} => ${body},`,
610+
].join("\n");
611+
612+
// Type declaration: callable interface with overloaded signatures
613+
const streamingFlagTypes = streamingFlagNames.map((name) => {
614+
const flag = flags.find((f) => f.name === name);
615+
return `${camelCase(name)}: ${flag?.tsType ?? "string"}`;
616+
});
617+
const streamingConstraint = streamingFlagTypes.join("; ");
618+
619+
typeDecl = [
620+
`${indent} /** ${brief} */`,
621+
`${indent} ${methodName}: {`,
622+
`${indent} ${sig}: Promise<${returnType}>;`,
623+
`${indent} (params: ${params?.name ?? "Record<string, never>"} & { ${streamingConstraint} }): AsyncIterable<unknown>;`,
624+
`${indent} };`,
625+
].join("\n");
626+
} else {
627+
methodCode = [
628+
`${indent} /** ${brief} */`,
629+
`${indent} ${methodName}: ${sig}: Promise<${returnType}> =>`,
630+
`${indent} ${body},`,
631+
].join("\n");
632+
633+
// Type declaration: method signature without implementation
634+
typeDecl = [
635+
`${indent} /** ${brief} */`,
636+
`${indent} ${methodName}${sig}: Promise<${returnType}>;`,
637+
].join("\n");
638+
}
530639

531640
insertMethod(root, path, methodCode, typeDecl);
532641
}

src/commands/dashboard/view.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ export const viewCommand = buildCommand({
219219
const stop = () => controller.abort();
220220
process.once("SIGINT", stop);
221221

222+
// Library mode: honor external abort signal (e.g., consumer break)
223+
const externalSignal = (this.process as { abortSignal?: AbortSignal })
224+
?.abortSignal;
225+
if (externalSignal) {
226+
externalSignal.addEventListener("abort", stop, { once: true });
227+
}
228+
222229
let isFirstRender = true;
223230

224231
try {

src/commands/log/list.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ type FollowGeneratorConfig<T extends LogLike> = {
256256
* Use this to seed dedup state (e.g., tracking seen log IDs).
257257
*/
258258
onInitialLogs?: (logs: T[]) => void;
259+
/**
260+
* External abort signal (library mode). When aborted, the follow
261+
* generator stops on the next poll cycle. Complements SIGINT handling.
262+
*/
263+
abortSignal?: AbortSignal;
259264
};
260265

261266
/** Find the highest timestamp_precise in a batch, or undefined if none have it. */
@@ -343,11 +348,16 @@ async function* generateFollowLogs<T extends LogLike>(
343348
// timestamp_precise is nanoseconds; Date.now() is milliseconds → convert
344349
let lastTimestamp = Date.now() * 1_000_000;
345350

346-
// AbortController for clean SIGINT handling
351+
// AbortController for clean SIGINT handling + library mode abort
347352
const controller = new AbortController();
348353
const stop = () => controller.abort();
349354
process.once("SIGINT", stop);
350355

356+
// Library mode: honor external abort signal (e.g., consumer break)
357+
if (config.abortSignal) {
358+
config.abortSignal.addEventListener("abort", stop, { once: true });
359+
}
360+
351361
try {
352362
// Initial fetch
353363
const initialLogs = await config.fetch("1m");
@@ -695,6 +705,8 @@ export const listCommand = buildListCommand(
695705
const generator = generateFollowLogs({
696706
flags,
697707
onDiagnostic: (msg) => logger.warn(msg),
708+
abortSignal: (this.process as { abortSignal?: AbortSignal })
709+
?.abortSignal,
698710
fetch: (statsPeriod) =>
699711
listTraceLogs(org, traceId, {
700712
query: flags.query,
@@ -757,6 +769,8 @@ export const listCommand = buildListCommand(
757769
const generator = generateFollowLogs({
758770
flags,
759771
onDiagnostic: (msg) => logger.warn(msg),
772+
abortSignal: (this.process as { abortSignal?: AbortSignal })
773+
?.abortSignal,
760774
fetch: (statsPeriod, afterTimestamp) =>
761775
listLogs(org, project, {
762776
query: flags.query,

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import { buildInvoker, buildRunner } from "./lib/sdk-invoke.js";
2626
import { createSDKMethods } from "./sdk.generated.js";
2727

28+
export type { AsyncChannel } from "./lib/async-channel.js";
2829
// Re-export public types and error class from the shared module.
2930
// These re-exports exist to break a circular dependency between
3031
// index.ts ↔ sdk-invoke.ts. SentryError and SentryOptions live

0 commit comments

Comments
 (0)