Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/fix-lifecycle-event-plumbing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@perstack/skill-manager": patch
"@perstack/runtime": patch
---

fix: connect skill adapter lifecycle events to runtime event system

Replace unused `runtimeEventListener` with `onLifecycleEvent` callback in SkillManagerOptions, thread it through all factory contexts and adapters, and bridge adapter-level lifecycle events (connecting/connected/stderr/disconnected) to runtime events (skillStarting/skillConnected/skillStderr/skillDisconnected) in CoordinatorExecutor. This enables real-time skill lifecycle notifications instead of batch post-connect events.
196 changes: 192 additions & 4 deletions packages/runtime/src/orchestration/coordinator-executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Checkpoint, Expert, RunSetting } from "@perstack/core"
import type { Checkpoint, Expert, RunSetting, RuntimeEvent } from "@perstack/core"
import type { SkillAdapterLifecycleEvent } from "@perstack/skill-manager"
import { describe, expect, it, vi } from "vitest"
import { CoordinatorExecutor } from "./coordinator-executor.js"

Expand Down Expand Up @@ -47,7 +48,7 @@ vi.mock("../helpers/index.js", () => ({
})),
}))

const { mockSkillManagerInstance } = vi.hoisted(() => ({
const { mockSkillManagerInstance, capturedOnLifecycleEvent } = vi.hoisted(() => ({
mockSkillManagerInstance: {
getToolDefinitions: vi.fn().mockReturnValue([]),
getAdapterByToolName: vi.fn(),
Expand All @@ -56,12 +57,37 @@ const { mockSkillManagerInstance } = vi.hoisted(() => ({
isClosed: false,
getAdapters: vi.fn().mockReturnValue(new Map()),
},
capturedOnLifecycleEvent: {
current: undefined as ((event: SkillAdapterLifecycleEvent) => void) | undefined,
},
}))

vi.mock("@perstack/skill-manager", () => ({
SkillManager: {
fromExpert: vi.fn().mockResolvedValue(mockSkillManagerInstance),
fromLockfile: vi.fn().mockResolvedValue(mockSkillManagerInstance),
fromExpert: vi
.fn()
.mockImplementation(
(
_expert: unknown,
_experts: unknown,
options: { onLifecycleEvent?: (event: SkillAdapterLifecycleEvent) => void },
) => {
capturedOnLifecycleEvent.current = options?.onLifecycleEvent
return Promise.resolve(mockSkillManagerInstance)
},
),
fromLockfile: vi
.fn()
.mockImplementation(
(
_expert: unknown,
_experts: unknown,
options: { onLifecycleEvent?: (event: SkillAdapterLifecycleEvent) => void },
) => {
capturedOnLifecycleEvent.current = options?.onLifecycleEvent
return Promise.resolve(mockSkillManagerInstance)
},
),
},
}))

Expand Down Expand Up @@ -254,5 +280,167 @@ describe("@perstack/runtime: coordinator-executor", () => {
}),
)
})

it("passes onLifecycleEvent bridge to SkillManager", async () => {
const { SkillManager } = await import("@perstack/skill-manager")
const eventListener = vi.fn()
const executor = new CoordinatorExecutor({ eventListener })
const setting = createMockSetting()

await executor.execute(setting)

expect(SkillManager.fromExpert).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
expect.objectContaining({
onLifecycleEvent: expect.any(Function),
}),
)
})

it("does not pass onLifecycleEvent when no eventListener", async () => {
const { SkillManager } = await import("@perstack/skill-manager")
const executor = new CoordinatorExecutor()
const setting = createMockSetting()

await executor.execute(setting)

expect(SkillManager.fromExpert).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
expect.objectContaining({
onLifecycleEvent: undefined,
}),
)
})
})

describe("lifecycle event bridge", () => {
it("bridges connecting event to skillStarting RuntimeEvent", async () => {
const events: RuntimeEvent[] = []
const eventListener = (event: unknown) => {
const e = event as RuntimeEvent
if ("type" in e && !("stepNumber" in e)) events.push(e)
}
const executor = new CoordinatorExecutor({ eventListener })
await executor.execute(createMockSetting())

const bridge = capturedOnLifecycleEvent.current
expect(bridge).toBeDefined()

bridge!({
type: "connecting",
adapterName: "test-skill",
data: { command: "npx", args: ["@test/pkg"] },
})

const startingEvent = events.find((e) => e.type === "skillStarting")
expect(startingEvent).toBeDefined()
expect(startingEvent).toMatchObject({
type: "skillStarting",
skillName: "test-skill",
command: "npx",
args: ["@test/pkg"],
})
})

it("bridges connected event to skillConnected RuntimeEvent", async () => {
const events: RuntimeEvent[] = []
const eventListener = (event: unknown) => {
const e = event as RuntimeEvent
if ("type" in e && !("stepNumber" in e)) events.push(e)
}
const executor = new CoordinatorExecutor({ eventListener })
await executor.execute(createMockSetting())

const bridge = capturedOnLifecycleEvent.current!

bridge({
type: "connected",
adapterName: "test-skill",
data: { connectDurationMs: 100, spawnDurationMs: 50 },
})

const connectedEvent = events.find((e) => e.type === "skillConnected")
expect(connectedEvent).toBeDefined()
expect(connectedEvent).toMatchObject({
type: "skillConnected",
skillName: "test-skill",
connectDurationMs: 100,
spawnDurationMs: 50,
})
})

it("bridges stderr event to skillStderr RuntimeEvent", async () => {
const events: RuntimeEvent[] = []
const eventListener = (event: unknown) => {
const e = event as RuntimeEvent
if ("type" in e && !("stepNumber" in e)) events.push(e)
}
const executor = new CoordinatorExecutor({ eventListener })
await executor.execute(createMockSetting())

const bridge = capturedOnLifecycleEvent.current!

bridge({
type: "stderr",
adapterName: "test-skill",
data: { message: "warning: something happened" },
})

const stderrEvent = events.find((e) => e.type === "skillStderr")
expect(stderrEvent).toBeDefined()
expect(stderrEvent).toMatchObject({
type: "skillStderr",
skillName: "test-skill",
message: "warning: something happened",
})
})

it("bridges disconnected event to skillDisconnected RuntimeEvent", async () => {
const events: RuntimeEvent[] = []
const eventListener = (event: unknown) => {
const e = event as RuntimeEvent
if ("type" in e && !("stepNumber" in e)) events.push(e)
}
const executor = new CoordinatorExecutor({ eventListener })
await executor.execute(createMockSetting())

const bridge = capturedOnLifecycleEvent.current!

bridge({
type: "disconnected",
adapterName: "test-skill",
})

const disconnectedEvent = events.find((e) => e.type === "skillDisconnected")
expect(disconnectedEvent).toBeDefined()
expect(disconnectedEvent).toMatchObject({
type: "skillDisconnected",
skillName: "test-skill",
})
})

it("does not bridge error events to RuntimeEvent", async () => {
const events: RuntimeEvent[] = []
const eventListener = (event: unknown) => {
const e = event as RuntimeEvent
if ("type" in e && !("stepNumber" in e)) events.push(e)
}
const executor = new CoordinatorExecutor({ eventListener })
await executor.execute(createMockSetting())

const bridge = capturedOnLifecycleEvent.current!

bridge({
type: "error",
adapterName: "test-skill",
data: { error: "connection failed" },
})

// Only initializeRuntime should be in events (emitted during execute), no error bridge
const errorEvents = events.filter((e) => e.type !== "initializeRuntime")
expect(errorEvents).toHaveLength(0)
})
})
})
69 changes: 55 additions & 14 deletions packages/runtime/src/orchestration/coordinator-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
type RuntimeEvent,
type Step,
} from "@perstack/core"
import { SkillManager } from "@perstack/skill-manager"
import { type SkillAdapterLifecycleEvent, SkillManager } from "@perstack/skill-manager"
import pkg from "../../package.json" with { type: "json" }
import { RunEventEmitter } from "../events/event-emitter.js"
import {
Expand Down Expand Up @@ -64,22 +64,24 @@ export class CoordinatorExecutor {

this.emitInitEvent(setting, expertToRun, experts)

const onLifecycleEvent = this.createLifecycleEventBridge(setting)

const lockfileExpert = this.options.lockfile?.experts[setting.expertKey]
const skillManager = lockfileExpert
? await SkillManager.fromLockfile(expertToRun, experts, {
env: setting.env,
perstackBaseSkillCommand: setting.perstackBaseSkillCommand,
isDelegatedRun: !!checkpoint?.delegatedBy,
lockfileToolDefinitions: getLockfileExpertToolDefinitions(lockfileExpert),
onLifecycleEvent,
})
: await SkillManager.fromExpert(expertToRun, experts, {
env: setting.env,
perstackBaseSkillCommand: setting.perstackBaseSkillCommand,
isDelegatedRun: !!checkpoint?.delegatedBy,
onLifecycleEvent,
})

this.emitSkillConnectedEvents(setting, skillManager)

const initialCheckpoint = checkpoint
? createNextStepCheckpoint(createId(), checkpoint, setting.runId)
: createInitialCheckpoint(createId(), {
Expand Down Expand Up @@ -120,17 +122,56 @@ export class CoordinatorExecutor {
}
}

private emitSkillConnectedEvents(setting: RunSetting, skillManager: SkillManager): void {
if (!this.options.eventListener) return

for (const adapter of skillManager.getAdapters().values()) {
if (adapter.type === "delegate" || adapter.type === "interactive") continue
const event = createRuntimeEvent("skillConnected", setting.jobId, setting.runId, {
skillName: adapter.name,
spawnDurationMs: adapter.spawnDurationMs,
totalDurationMs: adapter.connectDurationMs,
})
this.options.eventListener(event)
private createLifecycleEventBridge(
setting: RunSetting,
): ((event: SkillAdapterLifecycleEvent) => void) | undefined {
if (!this.options.eventListener) return undefined
const listener = this.options.eventListener
return (event: SkillAdapterLifecycleEvent) => {
switch (event.type) {
case "connecting": {
const data = event.data ?? {}
listener(
createRuntimeEvent("skillStarting", setting.jobId, setting.runId, {
skillName: event.adapterName,
command: (data.command as string) ?? "",
args: (data.args as string[]) ?? [],
}),
)
break
}
case "connected": {
const data = event.data ?? {}
listener(
createRuntimeEvent("skillConnected", setting.jobId, setting.runId, {
skillName: event.adapterName,
connectDurationMs: data.connectDurationMs as number | undefined,
totalDurationMs: data.connectDurationMs as number | undefined,
spawnDurationMs: data.spawnDurationMs as number | undefined,
}),
)
break
}
case "stderr": {
const data = event.data ?? {}
listener(
createRuntimeEvent("skillStderr", setting.jobId, setting.runId, {
skillName: event.adapterName,
message: (data.message as string) ?? "",
}),
)
break
}
case "disconnected": {
listener(
createRuntimeEvent("skillDisconnected", setting.jobId, setting.runId, {
skillName: event.adapterName,
}),
)
break
}
// "error" events are handled through promise rejection, no RuntimeEvent needed
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions packages/skill-manager/src/adapters/lockfile-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { LockfileToolDefinition, McpSseSkill, McpStdioSkill } from "@persta
import { describe, expect, it, vi } from "vitest"
import type { SkillAdapter } from "../skill-adapter.js"
import type { SkillAdapterFactory } from "../skill-adapter-factory.js"
import type { SkillAdapterLifecycleEvent } from "../types.js"
import { LockfileSkillAdapter } from "./lockfile-adapter.js"

function createMockSkill(overrides: Partial<McpStdioSkill> = {}): McpStdioSkill {
Expand Down Expand Up @@ -279,4 +280,39 @@ describe("@perstack/skill-manager: LockfileSkillAdapter", () => {
expect(adapter.skill).toBe(skill)
})
})

describe("lifecycle events", () => {
it("does not emit spurious events from lockfile adapter itself", async () => {
const events: SkillAdapterLifecycleEvent[] = []
const adapter = new LockfileSkillAdapter({
skill: createMockSkill(),
toolDefinitions: createMockToolDefinitions(),
env: {},
factory: createMockFactory(),
onLifecycleEvent: (event) => events.push(event),
})
await adapter.connect()
// Lockfile adapter's own connect/disconnect should not emit events
expect(events).toHaveLength(0)
})

it("passes onLifecycleEvent to real adapter factory context", async () => {
const onLifecycleEvent = vi.fn()
const mockAdapter = createMockAdapter()
const factory = createMockFactory(mockAdapter)
const adapter = new LockfileSkillAdapter({
skill: createMockSkill(),
toolDefinitions: createMockToolDefinitions(),
env: {},
factory,
onLifecycleEvent,
})
await adapter.connect()
await adapter.callTool("tool-a", {})
expect(factory.createMcp).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ onLifecycleEvent }),
)
})
})
})
Loading