diff --git a/packages/app/src/context/orchestrate.tsx b/packages/app/src/context/orchestrate.tsx index b2bf381..59dd07a 100644 --- a/packages/app/src/context/orchestrate.tsx +++ b/packages/app/src/context/orchestrate.tsx @@ -2699,6 +2699,7 @@ export const { use: useOrchestrate, provider: OrchestrateProvider } = createSimp model: { providerID: string; modelID: string } models?: OrchestrateRoleModelOverrides agent: string + allowInteractiveQuestions?: boolean tokenBudget?: number maxParallelAgents?: number maxRecursionDepth?: number @@ -2778,6 +2779,9 @@ export const { use: useOrchestrate, provider: OrchestrateProvider } = createSimp providerID: roleModels.master.providerID, modelID: roleModels.master.modelID, models: roleModels, + ...(typeof opts.allowInteractiveQuestions === "boolean" + ? { allowInteractiveQuestions: opts.allowInteractiveQuestions } + : {}), ...(opts.tokenBudget ? { tokenBudget: opts.tokenBudget } : {}), ...(opts.maxParallelAgents ? { maxParallelAgents: opts.maxParallelAgents } : {}), ...(opts.maxRecursionDepth ? { maxRecursionDepth: opts.maxRecursionDepth } : {}), diff --git a/packages/app/src/pages/orchestrate.tsx b/packages/app/src/pages/orchestrate.tsx index 36282a1..fde3f19 100644 --- a/packages/app/src/pages/orchestrate.tsx +++ b/packages/app/src/pages/orchestrate.tsx @@ -27,6 +27,7 @@ import { Button } from "@oneshot-ai/ui/button" import { IconButton } from "@oneshot-ai/ui/icon-button" import { Icon } from "@oneshot-ai/ui/icon" import { Select } from "@oneshot-ai/ui/select" +import { Switch as ToggleSwitch } from "@oneshot-ai/ui/switch" import { Tooltip } from "@oneshot-ai/ui/tooltip" import { ProviderIcon } from "@oneshot-ai/ui/provider-icon" import type { IconName } from "@oneshot-ai/ui/icons/provider" @@ -1562,6 +1563,7 @@ function PromptForm() { const [tokenBudget, setTokenBudget] = createSignal(BUDGET_OPTIONS[2]) // 5M default const [maxParallel, setMaxParallel] = createSignal(PARALLEL_OPTIONS[2]) // 20 default const [maxDepth, setMaxDepth] = createSignal(DEPTH_OPTIONS[2]) // 5 default + const [allowInteractiveQuestions, setAllowInteractiveQuestions] = createSignal(false) const [showAdvancedRouting, setShowAdvancedRouting] = createSignal(false) const [roleModels, setRoleModels] = createStore({}) @@ -1732,6 +1734,7 @@ function PromptForm() { } : {}), agent: currentAgent.name, + allowInteractiveQuestions: allowInteractiveQuestions(), tokenBudget: tokenBudget().value, maxParallelAgents: maxParallel().value, maxRecursionDepth: maxDepth().value, @@ -2015,6 +2018,14 @@ function PromptForm() { onSelect={(o) => o && setMaxDepth(o)} /> + +
+ Questions + +
diff --git a/packages/oneshot/src/orchestrator/index.ts b/packages/oneshot/src/orchestrator/index.ts index 2d8a941..dc10d9b 100644 --- a/packages/oneshot/src/orchestrator/index.ts +++ b/packages/oneshot/src/orchestrator/index.ts @@ -67,6 +67,19 @@ function resolveConfiguredMaxReplanCycles(): number { const DEFAULT_MAX_REPLAN_CYCLES = resolveConfiguredMaxReplanCycles() +function parseBooleanFlag(value: string | undefined, fallback: boolean): boolean { + if (!value?.trim()) return fallback + const normalized = value.trim().toLowerCase() + if (["1", "true", "yes", "on"].includes(normalized)) return true + if (["0", "false", "no", "off"].includes(normalized)) return false + return fallback +} + +const DEFAULT_ALLOW_INTERACTIVE_QUESTIONS = parseBooleanFlag( + process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS, + false, +) + function resolveRunMaxReplanCycles( requested: number | undefined, opts: { upgradeLegacyDefault?: boolean } = {}, @@ -94,6 +107,7 @@ export async function startOrchestration(opts: { providerID: string modelID: string models?: RunModelOverrides + allowInteractiveQuestions?: boolean tokenBudget?: number maxParallelAgents?: number maxRecursionDepth?: number @@ -126,6 +140,9 @@ export async function startOrchestration(opts: { provider: runModels.master.providerID, model: runModels.master.modelID, models: runModels, + allowInteractiveQuestions: typeof opts.allowInteractiveQuestions === "boolean" + ? opts.allowInteractiveQuestions + : DEFAULT_ALLOW_INTERACTIVE_QUESTIONS, tokenBudget, tokenKillThreshold: killThreshold, globalTokenUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, diff --git a/packages/oneshot/src/orchestrator/runtime/langgraph-engine.ts b/packages/oneshot/src/orchestrator/runtime/langgraph-engine.ts index df990db..f779eb7 100644 --- a/packages/oneshot/src/orchestrator/runtime/langgraph-engine.ts +++ b/packages/oneshot/src/orchestrator/runtime/langgraph-engine.ts @@ -28,6 +28,7 @@ import { setActiveRun, writeScratchpadEntry, } from "./store" +import { decodeQuestionMemoryValue, isQuestionMemoryScratchpadEntry } from "./question-memory" import { getRegisteredPlan, clearRegisteredPlan } from "../../tool/register-plan" import { Lock } from "../../util/lock" import { @@ -123,19 +124,22 @@ const MERGE_INGEST_SYSTEM_PROMPT = const PLANNER_SESSION_TIMEOUT_MS = parsePlannerSessionTimeout(process.env.ONESHOT_PLANNER_SESSION_TIMEOUT_MS) const PLANNER_MAX_ATTEMPTS = parsePlannerAttempts(process.env.ONESHOT_PLANNER_MAX_ATTEMPTS) const BUDGET_GOVERNOR_ENABLED = parseBooleanFlag(process.env.ONESHOT_BUDGET_GOVERNOR_ENABLED, true) -const ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = parseBooleanFlag( +const DEFAULT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = parseBooleanFlag( process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS, - true, + false, ) -const ORCHESTRATOR_PLANNER_SYSTEM_PROMPT = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS - ? "Use the question tool for planner clarifications so questions surface in Mission Control. Ask 3-10 targeted clarifying questions before finalizing register_plan unless those decisions are already resolved." - : "Interactive planner questions are disabled in this orchestration flow. Do not ask user questions in plain text or via the question tool. Capture explicit assumptions, call register_plan, then stop." -const ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS - ? "This is a planning/spec task in orchestrator mode. Ask 3-10 targeted clarifying questions via the question tool before finalizing SPEC.md updates. Wait for answers, then incorporate them." - : "This is a planning/spec task in orchestrator mode with interactive questions disabled. Do not use the question tool. Capture explicit assumptions in SPEC.md and continue." -const ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS - ? "If ambiguity blocks execution, ask 1-3 targeted clarifying questions via the question tool (not plain text) so they appear in Mission Control." - : "Interactive questions are disabled in this orchestration flow. If ambiguity blocks execution, state explicit assumptions in your summary and continue." +const ORCHESTRATOR_PLANNER_SYSTEM_PROMPT_INTERACTIVE = + "Use the question tool for planner clarifications so questions surface in Mission Control. Ask 3-10 targeted clarifying questions before finalizing register_plan unless those decisions are already resolved." +const ORCHESTRATOR_PLANNER_SYSTEM_PROMPT_NONINTERACTIVE = + "Interactive planner questions are disabled in this orchestration flow. Do not ask user questions in plain text or via the question tool. Capture explicit assumptions, call register_plan, then stop." +const ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT_INTERACTIVE = + "This is a planning/spec task in orchestrator mode. Ask 3-10 targeted clarifying questions via the question tool before finalizing SPEC.md updates. Wait for answers, then incorporate them." +const ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT_NONINTERACTIVE = + "This is a planning/spec task in orchestrator mode with interactive questions disabled. Do not use the question tool. Capture explicit assumptions in SPEC.md and continue." +const ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT_INTERACTIVE = + "If ambiguity blocks execution, ask 1-3 targeted clarifying questions via the question tool (not plain text) so they appear in Mission Control." +const ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT_NONINTERACTIVE = + "Interactive questions are disabled in this orchestration flow. If ambiguity blocks execution, state explicit assumptions in your summary and continue." const DEFAULT_MAX_REPLAN_CYCLES = parseMaxReplanCycles(process.env.ONESHOT_MAX_REPLAN_CYCLES) // NOTE: Keep this value in sync with LEGACY_DEFAULT_MAX_REPLAN_CYCLES in orchestrator/index.ts const LEGACY_DEFAULT_MAX_REPLAN_CYCLES = 3 @@ -151,6 +155,10 @@ const RETRY_MODE_REPLACEMENT_AGENT = "replacement_agent" const REPLACEMENT_BASE_BRANCH_METADATA_KEY = "_replacementStartBranch" const MERGE_INGEST_FAILURE_RE = /merge ingest|deterministic merge failed|merge blocked by untracked files|left unresolved conflicts|did not integrate branch/i +const DEADLOCK_TASK_ERROR_RE = + /(deadlock detected|no schedulable path to completion|subtask deadlock detected)/i +const SUPERVISOR_RECOVERABLE_FAILURE_RE = + /(terminated by supervisor|terminated:\s*agent stale|bootstrap stalled|output unchanged|^aborted$)/i const MERGE_INGEST_SELF_HEAL_EXACT_PATHS = new Set([".oneshot/oneshot.jsonc"]) const MERGE_INGEST_SELF_HEAL_PATH_PATTERNS = [ /(^|\/)__pycache__\//, @@ -594,8 +602,13 @@ export async function markUnschedulablePendingTasks( } } + incrementRunRuntimeStats(run, { deadlockEvents: 1 }) const blockedTaskIds: string[] = [] + const recoverabilityMemo = new Map() for (const task of pendingTasks) { + if (isTaskRecoverableForScheduling(graph, task.id, recoverabilityMemo, new Set())) { + continue + } task.status = "blocked" task.error = task.error ?? reason task.time.completed = Date.now() @@ -665,6 +678,13 @@ export async function executeTaskById( subtaskIds: uniqueNonEmptyStrings(task.subtaskIds), } } + + task.metrics = { + ...(task.metrics ?? {}), + dispatchCount: (task.metrics?.dispatchCount ?? 0) + 1, + lastDispatchedAt: Date.now(), + } + incrementRunRuntimeStats(run, { totalDispatches: 1 }) } if (task.status === "completed") { @@ -1142,6 +1162,7 @@ async function runInteractivePlanning(input: { input.abortSignal?.throwIfAborted() clearRegisteredPlan(session.id) + const interactiveQuestionsEnabled = allowInteractiveQuestionsForRun(input.run) let planFromPlanner: GeneratedPlan | undefined for (let attempt = 1; attempt <= PLANNER_MAX_ATTEMPTS; attempt++) { const plannerPrompt = attempt === 1 @@ -1149,14 +1170,15 @@ async function runInteractivePlanning(input: { rootPrompt: input.rootPrompt, researchContext: input.researchContext, scratchpadContext: input.scratchpadContext, + interactiveQuestionsEnabled, }) - : buildInteractivePlannerRecoveryPrompt(recoveryFeedback) + : buildInteractivePlannerRecoveryPrompt(interactiveQuestionsEnabled, recoveryFeedback) const promptPromise = SessionPrompt.prompt({ messageID: Identifier.ascending("message"), sessionID: session.id, agent: "orchestrator-planner", - system: ORCHESTRATOR_PLANNER_SYSTEM_PROMPT, + system: plannerSystemPromptForRun(input.run), model: { providerID: input.providerID, modelID: input.modelID, @@ -1541,12 +1563,24 @@ function materializeChecklistTasksForExecution(input: { return created } +function allowInteractiveQuestionsForRun(run: Pick): boolean { + if (typeof run.allowInteractiveQuestions === "boolean") return run.allowInteractiveQuestions + return DEFAULT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS +} + +function plannerSystemPromptForRun(run: Pick): string { + return allowInteractiveQuestionsForRun(run) + ? ORCHESTRATOR_PLANNER_SYSTEM_PROMPT_INTERACTIVE + : ORCHESTRATOR_PLANNER_SYSTEM_PROMPT_NONINTERACTIVE +} + function buildInteractivePlannerPrompt(input: { rootPrompt: string researchContext?: Record scratchpadContext?: string + interactiveQuestionsEnabled: boolean }): string { - const interactiveQuestionsEnabled = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const interactiveQuestionsEnabled = input.interactiveQuestionsEnabled const contextSnippet = input.researchContext ? JSON.stringify(input.researchContext, null, 2).slice(0, MAX_RESEARCH_CONTEXT_CHARS) : "" @@ -1654,14 +1688,17 @@ ${firstTurnConstraint} - After register_plan succeeds, STOP. Do not execute tasks yourself.` } -function buildInteractivePlannerRecoveryPrompt(feedback?: string): string { +function buildInteractivePlannerRecoveryPrompt( + interactiveQuestionsEnabled: boolean, + feedback?: string, +): string { const feedbackSnippet = feedback?.trim() ? `Previous attempt failed quality checks: ${feedback.trim().slice(0, 1_600)} ` : "" - const recoveryStepOne = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const recoveryStepOne = interactiveQuestionsEnabled ? `1) Ask targeted clarifying questions via the \`question\` tool when needed (target 3-10 total planner questions in this session). - If you already asked 3-10 useful questions, proceed with explicit assumptions for any remaining ambiguity.` : "1) Do NOT ask user clarifying questions (plain text or via `question` tool). Capture explicit assumptions for any remaining ambiguity." @@ -1944,6 +1981,66 @@ async function supervisorNode(state: EngineState): Promise> return {} } + const autoRecoveredDeadlockTasks = recoverDeadlockBlockedTasksForRetry(graph) + if (autoRecoveredDeadlockTasks.length > 0) { + const now = Date.now() + graph.progress = computeGraphProgress(graph) + run.time.updated = now + emitSupervisorThought(run.id, { + reasoning: + `Recovered ${autoRecoveredDeadlockTasks.length} deadlock-blocked task(s) by re-queueing them: ` + + `${autoRecoveredDeadlockTasks.join(", ")}.`, + action: "intervene", + }) + writeScratchpadEntry(run.id, { + key: `deadlock-recovery-${now}`, + value: + `Auto-requeued deadlock-blocked task(s): ${autoRecoveredDeadlockTasks.join(", ")} ` + + `(recovery budget per task: ${DEADLOCK_BLOCKED_RECOVERY_MAX_RETRIES}).`, + source: "supervisor", + category: "warning", + createdAt: now, + }) + emitScratchpadUpdated(run.id) + emitRunUpdated(run) + await persistSnapshot(state.projectRoot, run, graph) + log.warn("Supervisor auto-requeued deadlock-blocked tasks", { + runId: run.id, + taskIds: autoRecoveredDeadlockTasks, + }) + return {} + } + + const autoRecoveredSupervisorTasks = recoverSupervisorTerminatedTasksForRetry(failedTasks) + if (autoRecoveredSupervisorTasks.length > 0) { + const now = Date.now() + graph.progress = computeGraphProgress(graph) + run.time.updated = now + emitSupervisorThought(run.id, { + reasoning: + `Recovered ${autoRecoveredSupervisorTasks.length} supervisor-terminated task(s) by re-queueing them: ` + + `${autoRecoveredSupervisorTasks.join(", ")}.`, + action: "intervene", + }) + writeScratchpadEntry(run.id, { + key: `supervisor-termination-recovery-${now}`, + value: + `Auto-requeued supervisor-terminated task(s): ${autoRecoveredSupervisorTasks.join(", ")} ` + + `(recovery budget per task: ${SUPERVISOR_TERMINATION_RECOVERY_MAX_RETRIES}).`, + source: "supervisor", + category: "warning", + createdAt: now, + }) + emitScratchpadUpdated(run.id) + emitRunUpdated(run) + await persistSnapshot(state.projectRoot, run, graph) + log.warn("Supervisor auto-requeued terminated tasks", { + runId: run.id, + taskIds: autoRecoveredSupervisorTasks, + }) + return {} + } + // If only legacy deploy-remediation tasks failed, finalize and defer healing to Deploy Agent. if (replanningFailedTasks.length === 0 && failedGates.length === 0) { if (failedTasks.length === 0) { @@ -2075,6 +2172,36 @@ async function supervisorNode(state: EngineState): Promise> }) if (!modelDecision || !modelDecision.shouldReplan) { + const fallbackRetriedTasks = recoverGenericFailedTasksForRetry(replanningFailedTasks, { + maxTasks: Math.min(4, MAX_REPLAN_RETRY_TASKS), + }) + if (fallbackRetriedTasks.length > 0) { + const now = Date.now() + graph.progress = computeGraphProgress(graph) + run.time.updated = now + emitSupervisorThought(run.id, { + reasoning: + `Replan was skipped, so supervisor automatically retried ${fallbackRetriedTasks.length} failed task(s): ` + + `${fallbackRetriedTasks.join(", ")}.`, + action: "intervene", + }) + writeScratchpadEntry(run.id, { + key: `fallback-retry-${now}`, + value: `Fallback-retried failed task(s): ${fallbackRetriedTasks.join(", ")}.`, + source: "supervisor", + category: "warning", + createdAt: now, + }) + emitScratchpadUpdated(run.id) + emitRunUpdated(run) + await persistSnapshot(state.projectRoot, run, graph) + log.warn("Supervisor fallback-retried failed tasks after no-replan decision", { + runId: run.id, + taskIds: fallbackRetriedTasks, + }) + return {} + } + log.info("Supervisor decided not to replan", { runId: run.id, reasoning: modelDecision?.reasoning ?? "decision failed", @@ -2095,6 +2222,36 @@ async function supervisorNode(state: EngineState): Promise> && decision.tasksToRetry.length === 0 && decision.tasksToSkip.length === 0 ) { + const fallbackRetriedTasks = recoverGenericFailedTasksForRetry(replanningFailedTasks, { + maxTasks: Math.min(4, MAX_REPLAN_RETRY_TASKS), + }) + if (fallbackRetriedTasks.length > 0) { + const now = Date.now() + graph.progress = computeGraphProgress(graph) + run.time.updated = now + emitSupervisorThought(run.id, { + reasoning: + "Supervisor replan had no actionable edits; applied deterministic fallback retries for failed tasks: " + + `${fallbackRetriedTasks.join(", ")}.`, + action: "intervene", + }) + writeScratchpadEntry(run.id, { + key: `fallback-retry-${now}`, + value: `Fallback-retried failed task(s): ${fallbackRetriedTasks.join(", ")}.`, + source: "supervisor", + category: "warning", + createdAt: now, + }) + emitScratchpadUpdated(run.id) + emitRunUpdated(run) + await persistSnapshot(state.projectRoot, run, graph) + log.warn("Supervisor fallback-retried failed tasks after empty replan", { + runId: run.id, + taskIds: fallbackRetriedTasks, + }) + return {} + } + emitSupervisorThought(run.id, { reasoning: "Supervisor replan produced no actionable changes after deduplication/safety checks.", action: "observe", @@ -2150,24 +2307,23 @@ async function supervisorNode(state: EngineState): Promise> typeof retryMetadata[REPLACEMENT_BASE_BRANCH_METADATA_KEY] === "string" ? String(retryMetadata[REPLACEMENT_BASE_BRANCH_METADATA_KEY]).trim() : "" - if ( + const useReplacementAgent = existingRetryMode === RETRY_MODE_REPLACEMENT_AGENT || /^Terminated by supervisor:/i.test(priorError) || inheritedReplacementBranch - ) { - retryMetadata[RETRY_MODE_METADATA_KEY] = RETRY_MODE_REPLACEMENT_AGENT - } else { - delete retryMetadata[RETRY_MODE_METADATA_KEY] + if (inheritedReplacementBranch) { + setTaskReplacementBranch(task, inheritedReplacementBranch) + } else if (task.agentBranch) { + setTaskReplacementBranch(task, task.agentBranch) } task.metadata = retryMetadata - task.status = "pending" - task.error = undefined - task.result = undefined - task.qualityGate = undefined - task.sessionId = undefined + resetTaskForRetry(task) + if ( + useReplacementAgent + ) { + setTaskRetryMode(task, RETRY_MODE_REPLACEMENT_AGENT) + } task.retryCount += 1 - task.time.started = undefined - task.time.completed = undefined if (retry.revisedDescription) task.description = retry.revisedDescription if (retry.revisedAcceptanceCriteria) { task.metadata = { ...task.metadata, acceptanceCriteria: retry.revisedAcceptanceCriteria } @@ -3946,7 +4102,9 @@ async function runTaskPromptInDirectory(input: { const useMergeSpecialist = shouldRouteTaskToMergeAgent(input.task) const prompt = useMergeSpecialist ? buildMergeRemediationPrompt(input) : buildTaskPrompt(input) - const taskSystemPrompt = useMergeSpecialist ? undefined : buildTaskExecutionSystemPrompt(input.task) + const taskSystemPrompt = useMergeSpecialist + ? undefined + : buildTaskExecutionSystemPrompt(input.task, input.run) const promptPromise = SessionPrompt.prompt({ messageID: Identifier.ascending("message"), sessionID: session.id, @@ -4641,6 +4799,7 @@ async function planSubOrchestratorSubtasks(state: EngineState, task: Orchestrato taskDescription: task.description, parentPlanningSpec: graph.planningSpec, researchContext: state.researchContext, + interactiveQuestionsEnabled: allowInteractiveQuestionsForRun(run), }) state.abortSignal?.throwIfAborted() @@ -4649,7 +4808,7 @@ async function planSubOrchestratorSubtasks(state: EngineState, task: Orchestrato messageID: Identifier.ascending("message"), sessionID: subPlannerSession.id, agent: "orchestrator-planner", - system: ORCHESTRATOR_PLANNER_SYSTEM_PROMPT, + system: plannerSystemPromptForRun(run), model: { providerID: subOrchestratorModel.providerID, modelID: subOrchestratorModel.modelID }, parts: [{ type: "text", text: subPrompt }], }) @@ -4791,6 +4950,7 @@ function buildSubOrchestratorPrompt(input: { taskDescription: string parentPlanningSpec?: Orchestrator.PlanningSpec researchContext?: Record + interactiveQuestionsEnabled: boolean }): string { const parentContext = input.parentPlanningSpec ? `\n## Parent Plan Context\n- Objective: ${input.parentPlanningSpec.objective}\n- Success: ${input.parentPlanningSpec.successDefinition}` @@ -4799,7 +4959,7 @@ function buildSubOrchestratorPrompt(input: { const researchSnippet = input.researchContext ? JSON.stringify(input.researchContext, null, 2).slice(0, 3_000) : "" - const subPlannerQuestionStep = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const subPlannerQuestionStep = input.interactiveQuestionsEnabled ? "2. Ask clarifying questions via the `question` tool when subsystem scope/contracts are ambiguous (1-5 targeted questions). If clarity is sufficient, proceed with explicit assumptions." : "2. Do NOT ask user clarifying questions (plain text or `question` tool). If scope/contracts are ambiguous, proceed with explicit assumptions." @@ -4913,12 +5073,23 @@ Commands Run: \`\`\`` } -function buildTaskExecutionSystemPrompt(task: Orchestrator.Task): string { - if (isPlanningTask(task)) return ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT - return ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT +function buildTaskExecutionSystemPrompt( + task: Orchestrator.Task, + run: Pick, +): string { + const interactiveQuestionsEnabled = allowInteractiveQuestionsForRun(run) + if (isPlanningTask(task)) { + return interactiveQuestionsEnabled + ? ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT_INTERACTIVE + : ORCHESTRATOR_PLANNING_TASK_SYSTEM_PROMPT_NONINTERACTIVE + } + return interactiveQuestionsEnabled + ? ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT_INTERACTIVE + : ORCHESTRATOR_EXECUTION_TASK_SYSTEM_PROMPT_NONINTERACTIVE } function buildTaskPrompt(input: { + run: Orchestrator.Run rootPrompt: string task: Orchestrator.Task dependencyContext: string @@ -4971,10 +5142,11 @@ ${formatVerificationSpecForPrompt(verificationSpec)} ${formatVerificationFailureFeedbackForPrompt(verificationFeedback)} ` : "" - const planningQuestionGuidance = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const interactiveQuestionsEnabled = allowInteractiveQuestionsForRun(input.run) + const planningQuestionGuidance = interactiveQuestionsEnabled ? "- Before finalizing SPEC.md, ask 3-10 targeted clarifying questions via the `question` tool so they appear in Mission Control's escalation panel.\n- Wait for user answers, then incorporate them into the spec decisions." : "- Interactive questions are disabled in this orchestration flow.\n- Do NOT ask user questions in plain text and do NOT use the `question` tool.\n- Capture explicit assumptions in SPEC.md and continue." - const executionQuestionGuidance = ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const executionQuestionGuidance = interactiveQuestionsEnabled ? "- If ambiguity blocks implementation, ask 1-3 targeted clarifying questions via the `question` tool (not plain assistant text)." : "- Interactive questions are disabled in this orchestration flow. If ambiguity blocks implementation, capture explicit assumptions in your summary and continue." @@ -5075,7 +5247,17 @@ ${outputRequirements}` function buildScratchpadContext(graph: Orchestrator.TaskGraph): string { if (!graph.scratchpad || graph.scratchpad.length === 0) return "" const entries = graph.scratchpad.slice(-30) - const lines = entries.map((e) => `[${e.category}] ${e.key}: ${e.value}`) + const lines = entries.map((entry) => { + if (isQuestionMemoryScratchpadEntry(entry)) { + const parsed = decodeQuestionMemoryValue(entry.value) + if (parsed) { + const answerSummary = parsed.answers.length > 0 ? parsed.answers.join(", ") : "Unanswered" + const sourceLabel = parsed.source === "memory" ? "shared-memory" : parsed.source + return `[decision] prior-answer: "${parsed.question}" => ${answerSummary} (source: ${sourceLabel})` + } + } + return `[${entry.category}] ${entry.key}: ${entry.value}` + }) return lines.join("\n").slice(0, MAX_SCRATCHPAD_CONTEXT_CHARS) } @@ -5562,6 +5744,44 @@ function isSuccessfulDependencyStatus(status: Orchestrator.TaskStatus | undefine return status === "completed" || status === "skipped" } +function isTaskRecoverableForScheduling( + graph: Orchestrator.TaskGraph, + taskId: string, + memo: Map, + visiting: Set, +): boolean { + const memoized = memo.get(taskId) + if (typeof memoized === "boolean") return memoized + if (visiting.has(taskId)) return false + + const task = graph.tasks[taskId] + if (!task) { + memo.set(taskId, false) + return false + } + + if (isSuccessfulDependencyStatus(task.status) || task.status === "in_progress") { + memo.set(taskId, true) + return true + } + if (task.status !== "pending") { + memo.set(taskId, false) + return false + } + + visiting.add(taskId) + for (const depId of task.dependencies) { + if (!isTaskRecoverableForScheduling(graph, depId, memo, visiting)) { + visiting.delete(taskId) + memo.set(taskId, false) + return false + } + } + visiting.delete(taskId) + memo.set(taskId, true) + return true +} + function isLaneCompleteForScope( graph: Orchestrator.TaskGraph, scopeTasks: Orchestrator.Task[], @@ -5685,6 +5905,20 @@ function isRunCancelledTaskError(error: unknown): boolean { || reason.includes("canceled during") } +function isDeadlockTaskError(error: unknown): boolean { + if (typeof error !== "string") return false + const reason = error.trim() + if (!reason) return false + return DEADLOCK_TASK_ERROR_RE.test(reason) +} + +function isSupervisorRecoverableTaskError(error: unknown): boolean { + if (typeof error !== "string") return false + const reason = error.trim() + if (!reason) return false + return SUPERVISOR_RECOVERABLE_FAILURE_RE.test(reason) +} + function resetTaskForRetry(task: Orchestrator.Task): void { task.status = "pending" task.error = undefined @@ -5722,6 +5956,88 @@ function recoverRunCancelledSkippedTasksForRetry(graph: Orchestrator.TaskGraph): return recovered } +function recoverDeadlockBlockedTasksForRetry(graph: Orchestrator.TaskGraph): string[] { + if (DEADLOCK_BLOCKED_RECOVERY_MAX_RETRIES <= 0) return [] + + const recovered: string[] = [] + const recoverabilityMemo = new Map() + for (const task of Object.values(graph.tasks)) { + if (task.status !== "blocked") continue + if (!isDeadlockTaskError(task.error)) continue + + const metadata = ensureTaskMetadataObject(task) + const recoveryAttempts = safeNumber(metadata._deadlockRecoveryRetries) + if (recoveryAttempts >= DEADLOCK_BLOCKED_RECOVERY_MAX_RETRIES) continue + + const dependencyPathRecoverable = task.dependencies.every((depId) => + isTaskRecoverableForScheduling(graph, depId, recoverabilityMemo, new Set())) + if (!dependencyPathRecoverable) continue + + metadata._deadlockRecoveryRetries = recoveryAttempts + 1 + metadata._deadlockLastError = task.error + task.metadata = metadata + resetTaskForRetry(task) + recovered.push(task.id) + emitTaskUpdated(task) + } + + return recovered +} + +function recoverSupervisorTerminatedTasksForRetry(failedTasks: Orchestrator.Task[]): string[] { + if (SUPERVISOR_TERMINATION_RECOVERY_MAX_RETRIES <= 0) return [] + + const recovered: string[] = [] + for (const task of failedTasks) { + if (task.status !== "failed" && task.status !== "blocked") continue + if (isDeadlockTaskError(task.error)) continue + + const metadata = ensureTaskMetadataObject(task) + const lastError = typeof metadata._supervisorRecoveryLastError === "string" + ? metadata._supervisorRecoveryLastError + : "" + const corpus = `${task.error ?? ""}\n${lastError}` + if (!isSupervisorRecoverableTaskError(corpus)) continue + + const recoveryAttempts = safeNumber(metadata._supervisorRecoveryRetries) + if (recoveryAttempts >= SUPERVISOR_TERMINATION_RECOVERY_MAX_RETRIES) continue + + metadata._supervisorRecoveryRetries = recoveryAttempts + 1 + metadata._supervisorRecoveryLastError = task.error ?? lastError + task.metadata = metadata + if (task.agentBranch) setTaskReplacementBranch(task, task.agentBranch) + resetTaskForRetry(task) + setTaskRetryMode(task, RETRY_MODE_REPLACEMENT_AGENT) + recovered.push(task.id) + emitTaskUpdated(task) + } + + return recovered +} + +function recoverGenericFailedTasksForRetry( + failedTasks: Orchestrator.Task[], + options: { maxTasks: number }, +): string[] { + const recovered: string[] = [] + for (const task of failedTasks) { + if (recovered.length >= options.maxTasks) break + if (task.status !== "failed" && task.status !== "blocked") continue + if (task.retryCount >= task.maxRetries) continue + if (isDeadlockTaskError(task.error)) continue + if (isRunCancelledTaskError(task.error)) continue + if (isSupervisorRecoverableTaskError(task.error ?? "")) continue + if (MERGE_INGEST_FAILURE_RE.test(task.error ?? "")) continue + + if (task.agentBranch) setTaskReplacementBranch(task, task.agentBranch) + resetTaskForRetry(task) + task.retryCount += 1 + recovered.push(task.id) + emitTaskUpdated(task) + } + return recovered +} + function recoverMergeIngestFailedTasksForRetry(failedTasks: Orchestrator.Task[]): string[] { if (MERGE_INGEST_SUPERVISOR_MAX_RETRIES <= 0) return [] @@ -6168,6 +6484,12 @@ const MERGE_INGEST_SUPERVISOR_MAX_RETRIES = parseMergeIngestSupervisorRetries( const RUN_CANCELLED_RECOVERY_MAX_RETRIES = parseRunCancelledRecoveryRetries( process.env.ONESHOT_RUN_CANCELLED_RECOVERY_MAX_RETRIES, ) +const DEADLOCK_BLOCKED_RECOVERY_MAX_RETRIES = parseDeadlockBlockedRecoveryRetries( + process.env.ONESHOT_DEADLOCK_BLOCKED_RECOVERY_MAX_RETRIES, +) +const SUPERVISOR_TERMINATION_RECOVERY_MAX_RETRIES = parseSupervisorTerminationRecoveryRetries( + process.env.ONESHOT_SUPERVISOR_TERMINATION_RECOVERY_MAX_RETRIES, +) const MERGE_INGEST_BASE_DELAY_MS = 5_000 // 5s const MERGE_INGEST_MAX_DELAY_MS = 60_000 // 60s const USAGE_LIMIT_ERROR_RE = @@ -6381,6 +6703,20 @@ function parseRunCancelledRecoveryRetries(value: string | undefined): number { return Math.max(0, Math.min(20, Math.floor(parsed))) } +function parseDeadlockBlockedRecoveryRetries(value: string | undefined): number { + if (!value?.trim()) return 6 + const parsed = Number(value) + if (!Number.isFinite(parsed)) return 6 + return Math.max(0, Math.min(20, Math.floor(parsed))) +} + +function parseSupervisorTerminationRecoveryRetries(value: string | undefined): number { + if (!value?.trim()) return 4 + const parsed = Number(value) + if (!Number.isFinite(parsed)) return 4 + return Math.max(0, Math.min(12, Math.floor(parsed))) +} + function parseInstanceBootstrapTimeout(value: string | undefined): number { if (!value?.trim()) return DEFAULT_INSTANCE_BOOTSTRAP_TIMEOUT_MS const parsed = Number(value) diff --git a/packages/oneshot/src/orchestrator/runtime/question-memory.ts b/packages/oneshot/src/orchestrator/runtime/question-memory.ts new file mode 100644 index 0000000..acbb3b4 --- /dev/null +++ b/packages/oneshot/src/orchestrator/runtime/question-memory.ts @@ -0,0 +1,164 @@ +import { createHash } from "node:crypto" +import type { Orchestrator } from "../types" + +export const QUESTION_MEMORY_KEY_PREFIX = "question-memory:" +const QUESTION_MEMORY_VALUE_PREFIX = "QMEMv1 " + +export type QuestionMemorySource = "user" | "parent" | "escalation" | "memory" + +export interface QuestionMemoryRecord { + version: 1 + question: string + header?: string + normalizedQuestion: string + answers: string[] + source: QuestionMemorySource + sourceSessionID?: string + updatedAt: number +} + +function normalizeAnswerValues(values: string[]): string[] { + const normalized = [] as string[] + const seen = new Set() + + for (const value of values) { + if (typeof value !== "string") continue + const trimmed = value.trim() + if (!trimmed) continue + const key = trimmed.toLowerCase() + if (seen.has(key)) continue + seen.add(key) + normalized.push(trimmed) + } + + return normalized +} + +export function normalizeQuestionMemoryText(value: string): string { + return value + .toLowerCase() + .replace(/[`"'’]/g, "") + .replace(/[^a-z0-9]+/g, " ") + .trim() + .replace(/\s+/g, " ") +} + +export function createQuestionMemoryNormalizedQuestion(input: { question: string; header?: string }): string { + const combined = [input.header?.trim(), input.question.trim()].filter((part) => Boolean(part)).join(" ") + return normalizeQuestionMemoryText(combined) +} + +export function createQuestionMemoryKey(input: { question: string; header?: string }): string { + const normalizedQuestion = createQuestionMemoryNormalizedQuestion(input) + const fingerprint = createHash("sha1") + .update(normalizedQuestion || "empty-question") + .digest("hex") + .slice(0, 16) + return `${QUESTION_MEMORY_KEY_PREFIX}${fingerprint}` +} + +export function buildQuestionMemoryRecord(input: { + question: string + header?: string + answers: string[] + source: QuestionMemorySource + sourceSessionID?: string + updatedAt?: number +}): QuestionMemoryRecord | undefined { + const question = input.question.trim() + if (!question) return undefined + + const answers = normalizeAnswerValues(input.answers) + if (answers.length === 0) return undefined + + const normalizedQuestion = createQuestionMemoryNormalizedQuestion({ + question, + header: input.header, + }) + if (!normalizedQuestion) return undefined + + return { + version: 1, + question, + ...(input.header?.trim() ? { header: input.header.trim() } : {}), + normalizedQuestion, + answers, + source: input.source, + ...(input.sourceSessionID?.trim() ? { sourceSessionID: input.sourceSessionID.trim() } : {}), + updatedAt: input.updatedAt ?? Date.now(), + } +} + +export function encodeQuestionMemoryValue(record: QuestionMemoryRecord): string { + return `${QUESTION_MEMORY_VALUE_PREFIX}${JSON.stringify(record)}` +} + +function parseQuestionMemoryPayload(payload: unknown): QuestionMemoryRecord | undefined { + if (!payload || typeof payload !== "object") return undefined + const raw = payload as Record + const question = typeof raw.question === "string" ? raw.question.trim() : "" + if (!question) return undefined + const header = typeof raw.header === "string" ? raw.header.trim() : undefined + const answers = Array.isArray(raw.answers) + ? raw.answers + .filter((value): value is string => typeof value === "string") + .map((value) => value.trim()) + : [] + + const sourceCandidate = typeof raw.source === "string" ? raw.source.trim().toLowerCase() : "" + const source: QuestionMemorySource = sourceCandidate === "parent" + ? "parent" + : sourceCandidate === "escalation" + ? "escalation" + : sourceCandidate === "memory" + ? "memory" + : "user" + + return buildQuestionMemoryRecord({ + question, + header, + answers, + source, + sourceSessionID: typeof raw.sourceSessionID === "string" ? raw.sourceSessionID : undefined, + updatedAt: typeof raw.updatedAt === "number" && Number.isFinite(raw.updatedAt) ? raw.updatedAt : Date.now(), + }) +} + +export function decodeQuestionMemoryValue(value: string): QuestionMemoryRecord | undefined { + const trimmed = value.trim() + if (!trimmed) return undefined + + const payload = trimmed.startsWith(QUESTION_MEMORY_VALUE_PREFIX) + ? trimmed.slice(QUESTION_MEMORY_VALUE_PREFIX.length) + : trimmed + + try { + const parsed = JSON.parse(payload) + return parseQuestionMemoryPayload(parsed) + } catch { + return undefined + } +} + +export function isQuestionMemoryScratchpadEntry(entry: Pick): boolean { + return entry.key.startsWith(QUESTION_MEMORY_KEY_PREFIX) +} + +export function extractQuestionMemoryFromScratchpad( + entries: Orchestrator.ScratchpadEntry[], +): QuestionMemoryRecord[] { + const byQuestion = new Map() + + for (const entry of entries) { + if (!isQuestionMemoryScratchpadEntry(entry)) continue + const parsed = decodeQuestionMemoryValue(entry.value) + if (!parsed) continue + + const existing = byQuestion.get(parsed.normalizedQuestion) + if (!existing || parsed.updatedAt >= existing.updatedAt) { + byQuestion.set(parsed.normalizedQuestion, parsed) + } + } + + return [...byQuestion.values()].sort((left, right) => left.updatedAt - right.updatedAt) +} diff --git a/packages/oneshot/src/orchestrator/runtime/store.ts b/packages/oneshot/src/orchestrator/runtime/store.ts index d05b76e..d65c080 100644 --- a/packages/oneshot/src/orchestrator/runtime/store.ts +++ b/packages/oneshot/src/orchestrator/runtime/store.ts @@ -1,4 +1,10 @@ import type { Orchestrator } from "../types" +import { + buildQuestionMemoryRecord, + createQuestionMemoryKey, + encodeQuestionMemoryValue, + type QuestionMemorySource, +} from "./question-memory" const activeRuns = new Map() const activeGraphs = new Map() @@ -208,6 +214,31 @@ export function resolveEscalation(runId: string, escalationId: string, answer: s escalation.answer = answer escalation.answeredBy = answeredBy escalation.resolvedAt = Date.now() + + const normalizedAnswer = answer.trim() + if (normalizedAnswer) { + const source: QuestionMemorySource = answeredBy.trim().startsWith("parent:") ? "parent" : "user" + const record = buildQuestionMemoryRecord({ + question: escalation.question, + answers: [normalizedAnswer], + source, + sourceSessionID: escalation.fromSessionId, + updatedAt: escalation.resolvedAt, + }) + if (record) { + writeScratchpadEntry(runId, { + key: createQuestionMemoryKey({ + question: record.question, + header: record.header, + }), + value: encodeQuestionMemoryValue(record), + source: "supervisor", + category: "decision", + createdAt: record.updatedAt, + }) + } + } + return true } diff --git a/packages/oneshot/src/orchestrator/runtime/workflow.ts b/packages/oneshot/src/orchestrator/runtime/workflow.ts index 81a0f3a..2f26aee 100644 --- a/packages/oneshot/src/orchestrator/runtime/workflow.ts +++ b/packages/oneshot/src/orchestrator/runtime/workflow.ts @@ -246,7 +246,6 @@ export async function oneshotOrchestrationWorkflow(input: OrchestrationWorkflowI const noWorkCanStart = running.size === 0 && snapshot.pendingCount > 0 - && snapshot.inProgressCount === 0 && snapshot.readyTaskIds.length === 0 const noRecoverablePendingPath = snapshot.pendingCount > 0 @@ -285,17 +284,20 @@ export async function oneshotOrchestrationWorkflow(input: OrchestrationWorkflowI state.lastSupervisorAt = Date.now() lastSupervisorAt = state.lastSupervisorAt state.stage = "dispatching" - state.pauseRequested = true - state.note = resolution && resolution.blockedTaskIds.length > 0 - ? `Scheduler stall recovered by blocking ${resolution.blockedTaskIds.length} pending task(s). Run paused for recovery/resume.` - : "Scheduler stalled with no launchable work. Run paused for recovery/resume." - break + if (resolution && resolution.blockedTaskIds.length > 0) { + state.pauseRequested = true + state.note = `Scheduler stall recovered by blocking ${resolution.blockedTaskIds.length} pending task(s). Run paused for recovery/resume.` + break + } + deadlockStreak = 0 + continue } deadlockStreak += 1 if (deadlockStreak === DEADLOCK_SUPERVISOR_THRESHOLD) { state.stage = "supervising" await workflowActivities.runSupervisorCycleActivity(input).catch(() => {}) + await workflowActivities.runSupervisorInterventionActivity(input).catch(() => {}) state.lastSupervisorAt = Date.now() lastSupervisorAt = state.lastSupervisorAt state.stage = "dispatching" @@ -312,11 +314,13 @@ export async function oneshotOrchestrationWorkflow(input: OrchestrationWorkflowI state.lastSupervisorAt = Date.now() lastSupervisorAt = state.lastSupervisorAt state.stage = "dispatching" - state.pauseRequested = true - state.note = resolution && resolution.blockedTaskIds.length > 0 - ? `Scheduler stall recovered by blocking ${resolution.blockedTaskIds.length} pending task(s). Run paused for recovery/resume.` - : "Scheduler stalled with no launchable work. Run paused for recovery/resume." - break + if (resolution && resolution.blockedTaskIds.length > 0) { + state.pauseRequested = true + state.note = `Scheduler stall recovered by blocking ${resolution.blockedTaskIds.length} pending task(s). Run paused for recovery/resume.` + break + } + deadlockStreak = 0 + continue } } else { deadlockStreak = 0 @@ -326,20 +330,20 @@ export async function oneshotOrchestrationWorkflow(input: OrchestrationWorkflowI state.supervisorNudges > 0 || snapshotDelta.failedCountIncreased const shouldRunSupervisorByBackstop = - snapshot.pendingCount > 0 + (snapshot.pendingCount > 0 || snapshot.inProgressCount > 0) && now - lastSupervisorAt >= SUPERVISOR_BACKSTOP_INTERVAL_MS && now - lastSnapshotSignalAt >= SUPERVISOR_STALL_BACKSTOP_MS const shouldRunSupervisor = (shouldRunSupervisorByEvent && now - lastSupervisorAt >= SUPERVISOR_EVENT_MIN_INTERVAL_MS) || shouldRunSupervisorByBackstop - const shouldRunInterventionByEvent = running.size > 0 && ( + const shouldRunInterventionByEvent = snapshot.inProgressCount > 0 && ( snapshotDelta.progressChanged || snapshotDelta.topologyChanged || launchedThisIteration > 0 ) const shouldRunInterventionByBackstop = - running.size > 0 + snapshot.inProgressCount > 0 && now - lastInterventionAt >= SUPERVISOR_INTERVENTION_BACKSTOP_INTERVAL_MS && now - lastSnapshotSignalAt >= SUPERVISOR_STALL_BACKSTOP_MS const shouldRunIntervention = @@ -486,7 +490,6 @@ export async function oneshotAgentTaskWorkflow( const noWorkCanStart = running.size === 0 && snapshot.pendingCount > 0 - && snapshot.inProgressCount === 0 && snapshot.readyTaskIds.length === 0 const noRecoverablePendingPath = snapshot.pendingCount > 0 @@ -511,22 +514,34 @@ export async function oneshotAgentTaskWorkflow( continue } - await workflowActivities.markUnschedulableTasksActivity({ + const resolution = await workflowActivities.markUnschedulableTasksActivity({ input: input.orchestration, parentTaskId: input.taskId, reason: `Subtask deadlock detected under parent task ${input.taskId}: no recoverable dependency path remained.`, - }).catch(() => {}) - break + }).catch(() => undefined) + if (resolution && resolution.blockedTaskIds.length > 0) { + break + } + deadlockStreak = 0 + continue } deadlockStreak += 1 + if (deadlockStreak === DEADLOCK_SUPERVISOR_THRESHOLD) { + await workflowActivities.runSupervisorCycleActivity(input.orchestration).catch(() => {}) + await workflowActivities.runSupervisorInterventionActivity(input.orchestration).catch(() => {}) + } if (deadlockStreak >= DEADLOCK_FAIL_THRESHOLD) { - await workflowActivities.markUnschedulableTasksActivity({ + const resolution = await workflowActivities.markUnschedulableTasksActivity({ input: input.orchestration, parentTaskId: input.taskId, reason: `Subtask deadlock detected under parent task ${input.taskId}: no launchable work remained.`, - }).catch(() => {}) - break + }).catch(() => undefined) + if (resolution && resolution.blockedTaskIds.length > 0) { + break + } + deadlockStreak = 0 + continue } } else { deadlockStreak = 0 diff --git a/packages/oneshot/src/orchestrator/types.ts b/packages/oneshot/src/orchestrator/types.ts index a99bc16..70e599b 100644 --- a/packages/oneshot/src/orchestrator/types.ts +++ b/packages/oneshot/src/orchestrator/types.ts @@ -444,6 +444,7 @@ export namespace Orchestrator { provider: z.string(), model: z.string(), models: RunModels.optional(), + allowInteractiveQuestions: z.boolean().optional(), tokenBudget: z.number(), tokenKillThreshold: z.number(), globalTokenUsage: z.object({ diff --git a/packages/oneshot/src/question/index.ts b/packages/oneshot/src/question/index.ts index 481daf7..101e73e 100644 --- a/packages/oneshot/src/question/index.ts +++ b/packages/oneshot/src/question/index.ts @@ -102,6 +102,7 @@ export namespace Question { sessionID: string questions: Info[] tool?: { messageID: string; callID: string } + abort?: AbortSignal }): Promise { const s = state() const id = Identifier.ascending("question") @@ -109,17 +110,51 @@ export namespace Question { log.info("asking", { id, questions: input.questions.length }) return new Promise((resolve, reject) => { + if (input.abort?.aborted) { + reject(new AbortedError()) + return + } + const info: Request = { id, sessionID: input.sessionID, questions: input.questions, tool: input.tool, } + + const cleanup = () => { + if (!input.abort) return + input.abort.removeEventListener("abort", onAbort) + } + + const onAbort = () => { + const existing = s.pending[id] + if (!existing) return + delete s.pending[id] + cleanup() + Bus.publish(Event.Rejected, { + sessionID: info.sessionID, + requestID: info.id, + }) + existing.reject(new AbortedError()) + } + s.pending[id] = { info, - resolve, - reject, + resolve: (answers) => { + cleanup() + resolve(answers) + }, + reject: (error) => { + cleanup() + reject(error) + }, } + + if (input.abort) { + input.abort.addEventListener("abort", onAbort, { once: true }) + } + Bus.publish(Event.Asked, info) }) } @@ -169,6 +204,12 @@ export namespace Question { } } + export class AbortedError extends Error { + constructor() { + super("The question request was aborted") + } + } + export async function list() { const s = state() return Object.values(s.pending).map((item) => item.info) diff --git a/packages/oneshot/src/server/routes/orchestrator.ts b/packages/oneshot/src/server/routes/orchestrator.ts index 5aac5e9..002db34 100644 --- a/packages/oneshot/src/server/routes/orchestrator.ts +++ b/packages/oneshot/src/server/routes/orchestrator.ts @@ -91,6 +91,10 @@ const StartRunInput = z.object({ prompt: z.string().min(10).describe("The root prompt to orchestrate"), providerID: z.string().describe("AI provider ID (e.g. anthropic, openai)"), modelID: z.string().describe("Model ID (e.g. claude-3-5-sonnet-20241022)"), + allowInteractiveQuestions: z + .boolean() + .optional() + .describe("Allow interactive question prompts during orchestration planning/execution"), models: z .object({ master: RunModelSelectionInput.optional(), @@ -512,6 +516,7 @@ export const OrchestratorRoutes = lazy(() => providerID: resolvedMasterModel.providerID, modelID: resolvedMasterModel.modelID, models: resolvedRoleOverrides, + allowInteractiveQuestions: body.allowInteractiveQuestions, tokenBudget: body.tokenBudget, maxParallelAgents: body.maxParallelAgents, maxRecursionDepth: body.maxRecursionDepth, diff --git a/packages/oneshot/src/tool/question.ts b/packages/oneshot/src/tool/question.ts index f6d083c..f229fcd 100644 --- a/packages/oneshot/src/tool/question.ts +++ b/packages/oneshot/src/tool/question.ts @@ -3,6 +3,16 @@ import { Tool } from "./tool" import { Question } from "../question" import DESCRIPTION from "./question.txt" import { Log } from "../util/log" +import { + buildQuestionMemoryRecord, + createQuestionMemoryKey, + encodeQuestionMemoryValue, + extractQuestionMemoryFromScratchpad, + normalizeQuestionMemoryText, + type QuestionMemoryRecord, + type QuestionMemorySource, +} from "../orchestrator/runtime/question-memory" +import { getRun, getRunIDForSession, getScratchpad, writeScratchpadEntry } from "../orchestrator/runtime/store" const log = Log.create({ service: "tool.question" }) @@ -19,7 +29,7 @@ type ParentResolutionInput = { } type ResolutionSource = { - source: "parent" | "user" + source: "parent" | "user" | "memory" sessionID?: string } @@ -36,6 +46,7 @@ type ResolutionDependencies = { sessionID: string questions: ToolQuestion[] tool?: ToolCallRef + abort?: AbortSignal }): Promise } @@ -60,7 +71,7 @@ function defaultResolutionDependencies(): ResolutionDependencies { return { getParentSessionChain: resolveParentSessionChain, answerFromParent: attemptParentQuestionResolution, - askUser: Question.ask, + askUser: (input) => Question.ask(input), } } @@ -72,14 +83,84 @@ function parseBooleanFlag(value: string | undefined, defaultValue: boolean): boo return defaultValue } -function allowOrchestratorPlannerQuestions(): boolean { - return parseBooleanFlag(process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS, true) +function allowInteractiveQuestionsByDefault(): boolean { + return parseBooleanFlag(process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS, false) } +function allowInteractiveQuestionsForRun(runId: string | undefined): boolean { + if (runId) { + const run = getRun(runId) + if (typeof run?.allowInteractiveQuestions === "boolean") { + return run.allowInteractiveQuestions + } + } + return allowInteractiveQuestionsByDefault() +} + +function parseQuestionTimeoutMs(value: string | undefined): number { + if (!value?.trim()) return 20_000 + const parsed = Number(value) + if (!Number.isFinite(parsed)) return 20_000 + return Math.max(1_000, Math.min(120_000, Math.floor(parsed))) +} + +const ORCHESTRATOR_QUESTION_TIMEOUT_MS = parseQuestionTimeoutMs( + process.env.ONESHOT_ORCHESTRATOR_QUESTION_TIMEOUT_MS, +) + function allowHierarchicalQuestionRouting(): boolean { return parseBooleanFlag(process.env.ONESHOT_SUBAGENT_QUESTION_PARENT_CHAIN, true) } +function createAbortSignalWithTimeout(input: { + base: AbortSignal + timeoutMs?: number +}): { + signal: AbortSignal + cleanup(): void + timedOut(): boolean +} { + const timeoutMs = input.timeoutMs + if (!timeoutMs || timeoutMs <= 0) { + return { + signal: input.base, + cleanup: () => {}, + timedOut: () => false, + } + } + + const controller = new AbortController() + let didTimeout = false + let settled = false + + const abortFromBase = () => { + if (settled) return + settled = true + controller.abort(input.base.reason) + } + if (input.base.aborted) { + abortFromBase() + } else { + input.base.addEventListener("abort", abortFromBase, { once: true }) + } + + const timer = setTimeout(() => { + if (settled) return + settled = true + didTimeout = true + controller.abort(new Error("orchestrator question timeout")) + }, timeoutMs) + + return { + signal: controller.signal, + cleanup: () => { + clearTimeout(timer) + input.base.removeEventListener("abort", abortFromBase) + }, + timedOut: () => didTimeout, + } +} + function normalizeSelectionValues(input: Question.Answer | undefined): string[] { if (!Array.isArray(input)) return [] const normalized = [] as string[] @@ -109,6 +190,186 @@ function isValidParentAnswer(answer: Question.Answer | undefined, question: Tool return true } +const QUESTION_MATCH_STOPWORDS = new Set([ + "a", + "an", + "and", + "are", + "as", + "at", + "be", + "by", + "for", + "from", + "how", + "in", + "is", + "it", + "of", + "on", + "or", + "our", + "should", + "that", + "the", + "this", + "to", + "we", + "what", + "which", + "with", +]) + +type MemoryCandidate = { + answer: Question.Answer + mappedToOptions: boolean + confidence: number + memory: QuestionMemoryRecord +} + +function tokenizeQuestionMatch(value: string): string[] { + return normalizeQuestionMemoryText(value) + .split(" ") + .map((token) => { + if (token.length > 4 && token.endsWith("ies")) return `${token.slice(0, -3)}y` + if (token.length > 3 && token.endsWith("s") && !token.endsWith("ss")) return token.slice(0, -1) + return token + }) + .filter((token) => token.length > 1 && !QUESTION_MATCH_STOPWORDS.has(token)) +} + +function jaccardSimilarity(left: string[], right: string[]): number { + if (left.length === 0 || right.length === 0) return 0 + const leftSet = new Set(left) + const rightSet = new Set(right) + let intersection = 0 + for (const token of leftSet) { + if (rightSet.has(token)) intersection += 1 + } + const union = new Set([...leftSet, ...rightSet]).size + return union === 0 ? 0 : intersection / union +} + +function computeMemoryQuestionSimilarity(question: ToolQuestion, memory: QuestionMemoryRecord): number { + const normalizedCurrent = normalizeQuestionMemoryText([question.header, question.question].join(" ")) + if (!normalizedCurrent || !memory.normalizedQuestion) return 0 + if (normalizedCurrent === memory.normalizedQuestion) return 1 + + const shorterLength = Math.min(normalizedCurrent.length, memory.normalizedQuestion.length) + const longerLength = Math.max(normalizedCurrent.length, memory.normalizedQuestion.length) + if ( + shorterLength >= 12 + && (normalizedCurrent.includes(memory.normalizedQuestion) || memory.normalizedQuestion.includes(normalizedCurrent)) + && shorterLength / longerLength >= 0.7 + ) { + return 0.92 + } + + const currentTokens = tokenizeQuestionMatch(normalizedCurrent) + const memoryTokens = tokenizeQuestionMatch(memory.normalizedQuestion) + return jaccardSimilarity(currentTokens, memoryTokens) +} + +function mapAnswerToOptionLabel(answer: string, options: ToolQuestion["options"]): string | undefined { + const normalizedAnswer = normalizeQuestionMemoryText(answer) + if (!normalizedAnswer) return undefined + + let bestMatch: { label: string; score: number } | undefined + for (const option of options) { + const normalizedLabel = normalizeQuestionMemoryText(option.label) + if (!normalizedLabel) continue + if (normalizedLabel === normalizedAnswer) return option.label + + if ( + Math.min(normalizedLabel.length, normalizedAnswer.length) >= 4 + && (normalizedLabel.includes(normalizedAnswer) || normalizedAnswer.includes(normalizedLabel)) + ) { + return option.label + } + + const score = jaccardSimilarity(tokenizeQuestionMatch(normalizedLabel), tokenizeQuestionMatch(normalizedAnswer)) + if (!bestMatch || score > bestMatch.score) { + bestMatch = { label: option.label, score } + } + } + + if (bestMatch && bestMatch.score >= 0.7) return bestMatch.label + return undefined +} + +function adaptMemoryAnswersToQuestion(memory: QuestionMemoryRecord, question: ToolQuestion): { + answer: Question.Answer + mappedToOptions: boolean +} | undefined { + const rememberedAnswers = normalizeSelectionValues(memory.answers) + if (rememberedAnswers.length === 0) return undefined + + const mappedAnswers = rememberedAnswers + .map((value) => mapAnswerToOptionLabel(value, question.options)) + .filter((value): value is string => Boolean(value)) + + const uniqueMapped = [...new Set(mappedAnswers)] + if (uniqueMapped.length > 0) { + return { + answer: question.multiple === true ? uniqueMapped : [uniqueMapped[0]], + mappedToOptions: true, + } + } + + if (!questionAllowsCustom(question)) return undefined + return { + answer: question.multiple === true ? rememberedAnswers : [rememberedAnswers[0]], + mappedToOptions: false, + } +} + +function resolveAnswersFromSharedMemory(input: { + questions: ToolQuestion[] + memoryEntries: QuestionMemoryRecord[] +}): { + answers: Array + byQuestion: ResolutionSource[] +} { + const answers = Array(input.questions.length).fill(undefined) + const byQuestion = Array(input.questions.length).fill({ source: "user" }) + + for (let index = 0; index < input.questions.length; index++) { + const question = input.questions[index] + if (!question) continue + + let best: MemoryCandidate | undefined + for (const memory of input.memoryEntries) { + const similarity = computeMemoryQuestionSimilarity(question, memory) + if (similarity < 0.65) continue + + const adapted = adaptMemoryAnswersToQuestion(memory, question) + if (!adapted || adapted.answer.length === 0) continue + + let confidence = similarity + if (adapted.mappedToOptions) confidence += 0.1 + if (memory.source === "user") confidence += 0.04 + const candidate: MemoryCandidate = { + answer: adapted.answer, + mappedToOptions: adapted.mappedToOptions, + confidence, + memory, + } + if (!best || candidate.confidence > best.confidence) best = candidate + } + + if (!best) continue + if (best.confidence < 0.92 && (!best.mappedToOptions || best.confidence < 0.76)) continue + + answers[index] = best.answer + byQuestion[index] = { + source: "memory", + sessionID: best.memory.sourceSessionID, + } + } + + return { answers, byQuestion } +} + function buildParentResolutionPrompt(input: { childSessionID: string parentSessionID: string @@ -349,6 +610,7 @@ export async function resolveQuestionAnswersWithHierarchy(input: { tool?: ToolCallRef abort: AbortSignal allowHierarchy: boolean + questionTimeoutMs?: number }, deps: ResolutionDependencies = defaultResolutionDependencies()): Promise { const answers = Array(input.questions.length).fill(undefined) const byQuestion = Array(input.questions.length).fill({ source: "user" }) @@ -390,11 +652,34 @@ export async function resolveQuestionAnswersWithHierarchy(input: { if (unresolvedIndexes.length > 0) { const unresolvedQuestions = unresolvedIndexes.map((index) => input.questions[index]).filter((question) => Boolean(question)) - const userAnswers = await deps.askUser({ - sessionID: input.sessionID, - questions: unresolvedQuestions, - tool: input.tool, + const askControl = createAbortSignalWithTimeout({ + base: input.abort, + timeoutMs: input.questionTimeoutMs, }) + let userAnswers: Question.Answer[] + try { + userAnswers = await deps.askUser({ + sessionID: input.sessionID, + questions: unresolvedQuestions, + tool: input.tool, + abort: askControl.signal, + }) + } catch (error) { + const recoverable = + askControl.timedOut() + || error instanceof Question.RejectedError + || error instanceof Question.AbortedError + if (!recoverable) throw error + userAnswers = unresolvedQuestions.map(() => []) + log.warn("question resolution fallback applied", { + sessionID: input.sessionID, + unresolvedQuestionCount: unresolvedIndexes.length, + timedOut: askControl.timedOut(), + error: error instanceof Error ? error.message : String(error), + }) + } finally { + askControl.cleanup() + } for (let offset = 0; offset < unresolvedIndexes.length; offset++) { const index = unresolvedIndexes[offset] @@ -418,11 +703,13 @@ export const QuestionTool = Tool.define("question", { }), async execute(params, ctx) { const questionCount = params.questions.length + const runId = getRunIDForSession(ctx.sessionID) + const interactiveQuestionsEnabled = allowInteractiveQuestionsForRun(runId) - if (ctx.agent === "orchestrator-planner" && !allowOrchestratorPlannerQuestions()) { + if (ctx.agent === "orchestrator-planner" && !interactiveQuestionsEnabled) { return { title: `Skipped ${questionCount} question${questionCount > 1 ? "s" : ""}`, - output: "Interactive questions are disabled for orchestrator planner sessions. Continue by stating explicit assumptions and call register_plan.", + output: "Interactive questions are disabled for this orchestration run. Continue by stating explicit assumptions and call register_plan.", metadata: { skipped: true, reason: "orchestrator_planner_noninteractive", @@ -431,18 +718,87 @@ export const QuestionTool = Tool.define("question", { parentSessionsTried: [] as string[], answeredByParentCount: 0, answeredByUserCount: 0, + answeredByMemoryCount: 0, }, } } - const resolution = await resolveQuestionAnswersWithHierarchy({ - sessionID: ctx.sessionID, + const memoryEntries = runId + ? extractQuestionMemoryFromScratchpad(getScratchpad(runId)) + : [] + const memoryResolution = resolveAnswersFromSharedMemory({ questions: params.questions, - tool: ctx.callID ? { messageID: ctx.messageID, callID: ctx.callID } : undefined, - abort: ctx.abort, - allowHierarchy: allowHierarchicalQuestionRouting(), + memoryEntries, }) - const answers = resolution.answers + + const answers = Array(questionCount).fill([]) + const byQuestion = Array(questionCount).fill({ source: "user" }) + + for (let index = 0; index < questionCount; index++) { + const answer = memoryResolution.answers[index] + if (!answer || answer.length === 0) continue + answers[index] = answer + byQuestion[index] = memoryResolution.byQuestion[index] ?? { source: "memory" } + } + + const unresolvedIndexes = answers + .map((answer, index) => ({ index, unresolved: !answer || answer.length === 0 })) + .filter((entry) => entry.unresolved) + .map((entry) => entry.index) + + const hierarchicalResolution = unresolvedIndexes.length > 0 + ? await resolveQuestionAnswersWithHierarchy({ + sessionID: ctx.sessionID, + questions: unresolvedIndexes + .map((index) => params.questions[index]) + .filter((question): question is ToolQuestion => Boolean(question)), + tool: ctx.callID ? { messageID: ctx.messageID, callID: ctx.callID } : undefined, + abort: ctx.abort, + allowHierarchy: allowHierarchicalQuestionRouting(), + questionTimeoutMs: runId ? ORCHESTRATOR_QUESTION_TIMEOUT_MS : undefined, + }) + : { + answers: [] as Question.Answer[], + byQuestion: [] as ResolutionSource[], + parentSessionsTried: [] as string[], + } + + for (let offset = 0; offset < unresolvedIndexes.length; offset++) { + const index = unresolvedIndexes[offset] + answers[index] = hierarchicalResolution.answers[offset] ?? [] + byQuestion[index] = hierarchicalResolution.byQuestion[offset] ?? { source: "user" } + } + + if (runId) { + for (let index = 0; index < questionCount; index++) { + const question = params.questions[index] + const answer = normalizeSelectionValues(answers[index]) + if (!question || answer.length === 0) continue + const source = byQuestion[index]?.source ?? "user" + if (source === "memory") continue + const recordSource: QuestionMemorySource = source === "parent" + ? "parent" + : "user" + const record = buildQuestionMemoryRecord({ + question: question.question, + header: question.header, + answers: answer, + source: recordSource, + sourceSessionID: byQuestion[index]?.sessionID, + }) + if (!record) continue + writeScratchpadEntry(runId, { + key: createQuestionMemoryKey({ + question: record.question, + header: record.header, + }), + value: encodeQuestionMemoryValue(record), + source: "question-tool", + category: "decision", + createdAt: record.updatedAt, + }) + } + } function format(answer: Question.Answer | undefined) { if (!answer?.length) return "Unanswered" @@ -450,20 +806,23 @@ export const QuestionTool = Tool.define("question", { } const formatted = params.questions.map((q, i) => `"${q.question}"="${format(answers[i])}"`).join(", ") - const answeredByParentCount = resolution.byQuestion.filter((entry) => entry.source === "parent").length - const answeredByUserCount = resolution.byQuestion.filter((entry) => entry.source === "user").length + const answeredByParentCount = byQuestion.filter((entry) => entry.source === "parent").length + const answeredByUserCount = byQuestion.filter((entry) => entry.source === "user").length + const answeredByMemoryCount = byQuestion.filter((entry) => entry.source === "memory").length + const titleVerb = answeredByUserCount > 0 ? "Asked" : "Resolved" return { - title: `Asked ${questionCount} question${questionCount > 1 ? "s" : ""}`, + title: `${titleVerb} ${questionCount} question${questionCount > 1 ? "s" : ""}`, output: `Questions were resolved: ${formatted}. Continue using these answers.`, metadata: { skipped: false, reason: "answered", questionCount, answers, - parentSessionsTried: resolution.parentSessionsTried, + parentSessionsTried: hierarchicalResolution.parentSessionsTried, answeredByParentCount, answeredByUserCount, + answeredByMemoryCount, }, } }, diff --git a/packages/oneshot/test/orchestrator/mark-unschedulable.test.ts b/packages/oneshot/test/orchestrator/mark-unschedulable.test.ts new file mode 100644 index 0000000..f3a0f8a --- /dev/null +++ b/packages/oneshot/test/orchestrator/mark-unschedulable.test.ts @@ -0,0 +1,251 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test" +import type { Orchestrator } from "../../src/orchestrator/types" +import { markUnschedulablePendingTasks } from "../../src/orchestrator/runtime/langgraph-engine" +import { clearAllActiveRuns, getGraph, getRun, setActiveRun } from "../../src/orchestrator/runtime/store" +import { tmpdir } from "../fixture/fixture" + +function createRun(input: { runId: string; graphId: string }): Orchestrator.Run { + const now = Date.now() + return { + id: input.runId, + name: "Unschedulable marking test", + rootPrompt: "Mark unschedulable tasks safely", + graphId: input.graphId, + status: "active", + provider: "test", + model: "test", + tokenBudget: 100_000, + tokenKillThreshold: 90_000, + globalTokenUsage: { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + runtimeStats: { + totalDispatches: 0, + budgetThrottleEvents: 0, + deadlockEvents: 0, + }, + localDeploy: { + status: "idle", + runCount: 0, + logs: [], + stages: [], + requests: [], + }, + maxRecursionDepth: 2, + maxParallelAgents: 4, + time: { + created: now, + updated: now, + }, + } +} + +function createWorkflowInput(input: { + runId: string + projectRoot: string +}) { + return { + runId: input.runId, + rootPrompt: "Mark unschedulable tasks safely", + providerID: "test", + modelID: "test", + projectRoot: input.projectRoot, + tokenBudget: 100_000, + maxParallelAgents: 4, + maxRecursionDepth: 2, + } +} + +beforeEach(() => { + clearAllActiveRuns() +}) + +afterEach(() => { + clearAllActiveRuns() +}) + +describe("markUnschedulablePendingTasks", () => { + test("blocks only pending tasks with unrecoverable dependency paths", async () => { + await using tmp = await tmpdir({ git: true }) + const runId = `run-${Date.now()}-unschedulable-unrecoverable` + const graphId = `graph-${Date.now()}-unschedulable-unrecoverable` + const run = createRun({ runId, graphId }) + const now = Date.now() + const graph: Orchestrator.TaskGraph = { + id: graphId, + orchestrationId: runId, + rootPrompt: "Mark unschedulable tasks safely", + tasks: { + success: { + id: "success", + name: "Success root", + description: "Completed dependency", + status: "completed", + kind: "implementation", + dependencies: [], + dependents: ["recoverable"], + subtaskIds: [], + time: { + created: now - 5_000, + completed: now - 4_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + failed: { + id: "failed", + name: "Failed root", + description: "Failed dependency", + status: "failed", + kind: "implementation", + dependencies: [], + dependents: ["unrecoverable"], + subtaskIds: [], + error: "Task failed", + time: { + created: now - 5_000, + completed: now - 3_500, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + recoverable: { + id: "recoverable", + name: "Recoverable pending", + description: "Depends on completed task", + status: "pending", + kind: "implementation", + dependencies: ["success"], + dependents: [], + subtaskIds: [], + time: { + created: now - 3_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + unrecoverable: { + id: "unrecoverable", + name: "Unrecoverable pending", + description: "Depends on failed task", + status: "pending", + kind: "implementation", + dependencies: ["failed"], + dependents: [], + subtaskIds: [], + time: { + created: now - 3_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + }, + adjacencyList: { + success: ["recoverable"], + failed: ["unrecoverable"], + recoverable: [], + unrecoverable: [], + }, + status: "running", + progress: 30, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now - 6_000, + }, + } + setActiveRun(run, graph, tmp.path) + + const result = await markUnschedulablePendingTasks(createWorkflowInput({ + runId, + projectRoot: tmp.path, + })) + + expect(result.blockedTaskIds).toEqual(["unrecoverable"]) + expect(result.pendingCount).toBe(2) + expect(getGraph(graphId)?.tasks.recoverable.status).toBe("pending") + expect(getGraph(graphId)?.tasks.unrecoverable.status).toBe("blocked") + expect(getRun(runId)?.runtimeStats?.deadlockEvents).toBe(1) + }) + + test("does not block pending tasks when dependency paths are still recoverable", async () => { + await using tmp = await tmpdir({ git: true }) + const runId = `run-${Date.now()}-unschedulable-recoverable` + const graphId = `graph-${Date.now()}-unschedulable-recoverable` + const run = createRun({ runId, graphId }) + const now = Date.now() + const graph: Orchestrator.TaskGraph = { + id: graphId, + orchestrationId: runId, + rootPrompt: "Mark unschedulable tasks safely", + tasks: { + first: { + id: "first", + name: "First pending", + description: "Pending root task", + status: "pending", + kind: "implementation", + dependencies: [], + dependents: ["second"], + subtaskIds: [], + time: { + created: now - 2_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + second: { + id: "second", + name: "Second pending", + description: "Pending task with recoverable dependency", + status: "pending", + kind: "implementation", + dependencies: ["first"], + dependents: [], + subtaskIds: [], + time: { + created: now - 1_500, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + }, + adjacencyList: { + first: ["second"], + second: [], + }, + status: "running", + progress: 0, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now - 3_000, + }, + } + setActiveRun(run, graph, tmp.path) + + const result = await markUnschedulablePendingTasks(createWorkflowInput({ + runId, + projectRoot: tmp.path, + })) + + expect(result.blockedTaskIds).toEqual([]) + expect(getGraph(graphId)?.tasks.first.status).toBe("pending") + expect(getGraph(graphId)?.tasks.second.status).toBe("pending") + expect(getRun(runId)?.runtimeStats?.deadlockEvents).toBe(1) + }) +}) diff --git a/packages/oneshot/test/orchestrator/runtime-store.test.ts b/packages/oneshot/test/orchestrator/runtime-store.test.ts index 9915a22..6eebd1a 100644 --- a/packages/oneshot/test/orchestrator/runtime-store.test.ts +++ b/packages/oneshot/test/orchestrator/runtime-store.test.ts @@ -1,13 +1,69 @@ import { beforeEach, describe, expect, test } from "bun:test" +import type { Orchestrator } from "../../src/orchestrator/types" import { + addEscalation, abortTaskExecution, abortRunExecution, clearAllActiveRuns, clearRunAbortController, clearTaskAbortController, + getScratchpad, registerRunAbortController, registerTaskAbortController, + resolveEscalation, + setActiveRun, } from "../../src/orchestrator/runtime/store" +import { + decodeQuestionMemoryValue, + isQuestionMemoryScratchpadEntry, +} from "../../src/orchestrator/runtime/question-memory" + +function createRun(runId: string, graphId: string): Orchestrator.Run { + const now = Date.now() + return { + id: runId, + name: "runtime-store-test", + rootPrompt: "test", + graphId, + status: "active", + provider: "anthropic", + model: "claude-sonnet-4-5", + tokenBudget: 100_000, + tokenKillThreshold: 90_000, + globalTokenUsage: { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + maxRecursionDepth: 3, + maxParallelAgents: 4, + time: { + created: now, + updated: now, + }, + } +} + +function createGraph(runId: string, graphId: string): Orchestrator.TaskGraph { + const now = Date.now() + return { + id: graphId, + orchestrationId: runId, + rootPrompt: "test", + tasks: {}, + adjacencyList: {}, + status: "running", + progress: 0, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now, + }, + } +} describe("orchestrator runtime store abort controllers", () => { beforeEach(() => { @@ -87,4 +143,33 @@ describe("orchestrator runtime store abort controllers", () => { expect(first.signal.aborted).toBe(false) expect(second.signal.aborted).toBe(true) }) + + test("resolveEscalation writes answered questions into shared scratchpad memory", () => { + const runId = "run-escalation-memory" + const graphId = "graph-escalation-memory" + const run = createRun(runId, graphId) + const graph = createGraph(runId, graphId) + setActiveRun(run, graph) + + addEscalation(runId, { + id: "esc-1", + runId, + fromTaskId: "task-1", + fromAgentId: "agent-1", + fromSessionId: "session-parent", + question: "Which cache backend should we use for sessions?", + priority: "medium", + status: "pending", + createdAt: Date.now(), + }) + + expect(resolveEscalation(runId, "esc-1", "Redis", "user")).toBe(true) + + const memoryEntry = getScratchpad(runId).find((entry) => isQuestionMemoryScratchpadEntry(entry)) + expect(memoryEntry).toBeDefined() + const parsed = memoryEntry ? decodeQuestionMemoryValue(memoryEntry.value) : undefined + expect(parsed?.question).toBe("Which cache backend should we use for sessions?") + expect(parsed?.answers).toEqual(["Redis"]) + expect(parsed?.source).toBe("user") + }) }) diff --git a/packages/oneshot/test/orchestrator/supervisor-cancelled-recovery.test.ts b/packages/oneshot/test/orchestrator/supervisor-cancelled-recovery.test.ts index cf174d3..380794d 100644 --- a/packages/oneshot/test/orchestrator/supervisor-cancelled-recovery.test.ts +++ b/packages/oneshot/test/orchestrator/supervisor-cancelled-recovery.test.ts @@ -159,4 +159,167 @@ describe("orchestrator supervisor cancelled-task recovery", () => { expect(snapshot.pendingRecoverableCount).toBeGreaterThan(0) expect(snapshot.readyTaskIds).toContain("batch-ui") }) + + test("auto-requeues deadlock-blocked tasks when dependency path is recoverable", async () => { + await using tmp = await tmpdir({ git: true }) + const runId = `run-${Date.now()}-deadlock-recovery` + const graphId = `graph-${Date.now()}-deadlock-recovery` + const run = createRun({ runId, graphId }) + const now = Date.now() + const graph: Orchestrator.TaskGraph = { + id: graphId, + orchestrationId: runId, + rootPrompt: "Recover deadlock-blocked tasks", + tasks: { + implementation: { + id: "implementation", + name: "Implementation", + description: "Implement feature", + status: "completed", + kind: "implementation", + dependencies: [], + dependents: ["validation"], + subtaskIds: [], + time: { + created: now - 4_000, + completed: now - 2_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + validation: { + id: "validation", + name: "Validation", + description: "Run integration validation", + status: "blocked", + kind: "validation", + dependencies: ["implementation"], + dependents: [], + subtaskIds: [], + error: "Deadlock detected: pending tasks are unschedulable and no work can be launched.", + time: { + created: now - 3_000, + completed: now - 1_500, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + }, + adjacencyList: { + implementation: ["validation"], + validation: [], + }, + status: "running", + progress: 60, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now - 6_000, + }, + } + setActiveRun(run, graph, tmp.path) + + const workflowInput = createWorkflowInput({ + runId, + projectRoot: tmp.path, + }) + await runSupervisorPhase(workflowInput) + + const nextGraph = getGraph(graphId) + const recoveredTask = nextGraph?.tasks.validation + expect(recoveredTask?.status).toBe("pending") + expect(recoveredTask?.error).toBeUndefined() + const recoveredMetadata = (recoveredTask?.metadata ?? {}) as Record + expect(recoveredMetadata._deadlockRecoveryRetries).toBe(1) + + const snapshot = await getExecutionSnapshot(workflowInput) + expect(snapshot.readyTaskIds).toContain("validation") + }) + + test("auto-requeues supervisor-terminated failed tasks using replacement-agent mode", async () => { + await using tmp = await tmpdir({ git: true }) + const runId = `run-${Date.now()}-terminated-recovery` + const graphId = `graph-${Date.now()}-terminated-recovery` + const run = createRun({ runId, graphId }) + const now = Date.now() + const graph: Orchestrator.TaskGraph = { + id: graphId, + orchestrationId: runId, + rootPrompt: "Recover supervisor-terminated tasks", + tasks: { + scaffold: { + id: "scaffold", + name: "Project scaffold", + description: "Scaffold the app", + status: "failed", + kind: "implementation", + dependencies: [], + dependents: ["validate"], + subtaskIds: [], + error: "Terminated by supervisor: Output unchanged for 10 supervisor checks over 600s", + time: { + created: now - 8_000, + started: now - 7_000, + completed: now - 6_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + validate: { + id: "validate", + name: "Validation", + description: "Validate scaffolded app", + status: "pending", + kind: "validation", + dependencies: ["scaffold"], + dependents: [], + subtaskIds: [], + time: { + created: now - 5_000, + }, + retryCount: 0, + maxRetries: 2, + metadata: {}, + }, + }, + adjacencyList: { + scaffold: ["validate"], + validate: [], + }, + status: "running", + progress: 35, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now - 9_000, + }, + } + setActiveRun(run, graph, tmp.path) + + const workflowInput = createWorkflowInput({ + runId, + projectRoot: tmp.path, + }) + await runSupervisorPhase(workflowInput) + + const nextGraph = getGraph(graphId) + const recoveredTask = nextGraph?.tasks.scaffold + expect(recoveredTask?.status).toBe("pending") + expect(recoveredTask?.error).toBeUndefined() + const recoveredMetadata = (recoveredTask?.metadata ?? {}) as Record + expect(recoveredMetadata._supervisorRecoveryRetries).toBe(1) + expect(recoveredMetadata._retryMode).toBe("replacement_agent") + + const snapshot = await getExecutionSnapshot(workflowInput) + expect(snapshot.readyTaskIds).toContain("scaffold") + }) }) diff --git a/packages/oneshot/test/question/question.test.ts b/packages/oneshot/test/question/question.test.ts index 4fb958b..3466d5b 100644 --- a/packages/oneshot/test/question/question.test.ts +++ b/packages/oneshot/test/question/question.test.ts @@ -210,6 +210,36 @@ test("reject - does nothing for unknown requestID", async () => { }) }) +test("ask - abort signal rejects and clears pending request", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const controller = new AbortController() + const askPromise = Question.ask({ + sessionID: "ses_abort", + questions: [ + { + question: "Proceed?", + header: "Abort", + options: [{ label: "Yes", description: "Proceed" }], + }, + ], + abort: controller.signal, + }) + + const pending = await Question.list() + expect(pending.length).toBe(1) + + controller.abort() + + await expect(askPromise).rejects.toBeInstanceOf(Question.AbortedError) + const pendingAfter = await Question.list() + expect(pendingAfter.length).toBe(0) + }, + }) +}) + // multiple questions tests test("ask - handles multiple questions", async () => { diff --git a/packages/oneshot/test/tool/question.test.ts b/packages/oneshot/test/tool/question.test.ts index ccbbaf9..5babf61 100644 --- a/packages/oneshot/test/tool/question.test.ts +++ b/packages/oneshot/test/tool/question.test.ts @@ -2,6 +2,8 @@ import { describe, expect, test, spyOn, beforeEach, afterEach } from "bun:test" import { z } from "zod" import { QuestionTool, resolveQuestionAnswersWithHierarchy } from "../../src/tool/question" import * as QuestionModule from "../../src/question" +import type { Orchestrator } from "../../src/orchestrator/types" +import { addRunSessionID, clearAllActiveRuns, setActiveRun } from "../../src/orchestrator/runtime/store" const ctx = { sessionID: "test-session", @@ -14,17 +16,75 @@ const ctx = { ask: async () => {}, } +function createRun( + runId: string, + graphId: string, + options: { allowInteractiveQuestions?: boolean } = {}, +): Orchestrator.Run { + const now = Date.now() + return { + id: runId, + name: "question-tool-test", + rootPrompt: "test", + graphId, + status: "active", + provider: "anthropic", + model: "claude-sonnet-4-5", + allowInteractiveQuestions: options.allowInteractiveQuestions, + tokenBudget: 100_000, + tokenKillThreshold: 90_000, + globalTokenUsage: { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + maxRecursionDepth: 3, + maxParallelAgents: 4, + time: { + created: now, + updated: now, + }, + } +} + +function createGraph(runId: string, graphId: string): Orchestrator.TaskGraph { + const now = Date.now() + return { + id: graphId, + orchestrationId: runId, + rootPrompt: "test", + tasks: {}, + adjacencyList: {}, + status: "running", + progress: 0, + replanCount: 0, + maxReplanCycles: 3, + validationRetries: 0, + maxValidationRetries: 2, + scratchpad: [], + time: { + created: now, + }, + } +} + describe("tool.question", () => { let askSpy: any beforeEach(() => { + clearAllActiveRuns() delete process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS + const runId = "run-question-test" + const graphId = "graph-question-test" + setActiveRun(createRun(runId, graphId), createGraph(runId, graphId)) + addRunSessionID(runId, ctx.sessionID) askSpy = spyOn(QuestionModule.Question, "ask").mockImplementation(async () => { return [] }) }) afterEach(() => { + clearAllActiveRuns() delete process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS askSpy.mockRestore() }) @@ -66,7 +126,7 @@ describe("tool.question", () => { expect(result.output).toContain(`"What is your favorite animal?"="Dog"`) }) - test("should allow orchestrator-planner questions by default", async () => { + test("should skip orchestrator-planner questions by default", async () => { const tool = await QuestionTool.init() const questions = [ { @@ -85,10 +145,95 @@ describe("tool.question", () => { }, ) + expect(askSpy).not.toHaveBeenCalled() + expect(result.title).toBe("Skipped 1 question") + expect(result.output).toContain("Interactive questions are disabled") + }) + + test("should allow orchestrator-planner questions when interactive override is enabled", async () => { + process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = "true" + const tool = await QuestionTool.init() + const questions = [ + { + question: "Which scope should we target first?", + header: "Scope", + options: [{ label: "MVP", description: "Ship a playable first slice" }], + }, + ] + + askSpy.mockResolvedValueOnce([["MVP"]]) + const result = await tool.execute( + { questions }, + { + ...ctx, + agent: "orchestrator-planner", + }, + ) + + expect(askSpy).toHaveBeenCalledTimes(1) + expect(result.title).toBe("Asked 1 question") + }) + + test("run-level interactive question flag overrides env disable", async () => { + clearAllActiveRuns() + const runId = "run-question-test-override-true" + const graphId = "graph-question-test-override-true" + setActiveRun(createRun(runId, graphId, { allowInteractiveQuestions: true }), createGraph(runId, graphId)) + addRunSessionID(runId, ctx.sessionID) + + process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = "false" + const tool = await QuestionTool.init() + const questions = [ + { + question: "Should we scope this to MVP first?", + header: "Scope", + options: [{ label: "MVP", description: "Ship a playable first slice" }], + }, + ] + + askSpy.mockResolvedValueOnce([["MVP"]]) + const result = await tool.execute( + { questions }, + { + ...ctx, + agent: "orchestrator-planner", + }, + ) + expect(askSpy).toHaveBeenCalledTimes(1) expect(result.title).toBe("Asked 1 question") }) + test("run-level interactive question flag overrides env enable", async () => { + clearAllActiveRuns() + const runId = "run-question-test-override-false" + const graphId = "graph-question-test-override-false" + setActiveRun(createRun(runId, graphId, { allowInteractiveQuestions: false }), createGraph(runId, graphId)) + addRunSessionID(runId, ctx.sessionID) + + process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = "true" + const tool = await QuestionTool.init() + const questions = [ + { + question: "Should we scope this to MVP first?", + header: "Scope", + options: [{ label: "MVP", description: "Ship a playable first slice" }], + }, + ] + + askSpy.mockResolvedValueOnce([["MVP"]]) + const result = await tool.execute( + { questions }, + { + ...ctx, + agent: "orchestrator-planner", + }, + ) + + expect(askSpy).not.toHaveBeenCalled() + expect(result.title).toBe("Skipped 1 question") + }) + test("should skip orchestrator-planner questions when override env disables interactivity", async () => { process.env.ONESHOT_ORCHESTRATOR_ALLOW_INTERACTIVE_QUESTIONS = "false" const tool = await QuestionTool.init() @@ -215,6 +360,76 @@ describe("tool.question", () => { ]) }) + test("falls back gracefully when user question flow is rejected", async () => { + const questions = [ + { + question: "Should we enable telemetry now?", + header: "Telemetry", + options: [ + { label: "Yes", description: "Ship telemetry now" }, + { label: "No", description: "Defer telemetry" }, + ], + }, + ] + + const resolution = await resolveQuestionAnswersWithHierarchy({ + sessionID: "ses_child", + questions, + abort: AbortSignal.any([]), + allowHierarchy: false, + questionTimeoutMs: 5_000, + }, { + async getParentSessionChain() { + return [] + }, + async answerFromParent() { + return new Map() + }, + async askUser() { + throw new QuestionModule.Question.RejectedError() + }, + }) + + expect(resolution.answers).toEqual([[]]) + expect(resolution.byQuestion).toEqual([{ source: "user" }]) + }) + + test("reuses shared question memory and skips re-asking semantically similar questions", async () => { + const tool = await QuestionTool.init() + const initialQuestion = [ + { + question: "Which cache backend should we use for sessions?", + header: "Cache backend", + options: [ + { label: "Redis", description: "Use Redis" }, + { label: "Memory", description: "Use in-memory cache" }, + ], + }, + ] + const followupQuestion = [ + { + question: "What should be our session cache backend?", + header: "Session cache", + options: [ + { label: "Redis", description: "Use Redis" }, + { label: "Memory", description: "Use in-memory cache" }, + ], + }, + ] + + askSpy.mockResolvedValueOnce([["Redis"]]) + + const first = await tool.execute({ questions: initialQuestion }, ctx) + expect(first.title).toBe("Asked 1 question") + expect(askSpy).toHaveBeenCalledTimes(1) + + const second = await tool.execute({ questions: followupQuestion }, ctx) + expect(second.title).toBe("Resolved 1 question") + expect(second.metadata.answeredByMemoryCount).toBe(1) + expect(second.output).toContain(`"What should be our session cache backend?"="Redis"`) + expect(askSpy).toHaveBeenCalledTimes(1) + }) + // intentionally removed the zod validation due to tool call errors, hoping prompting is gonna be good enough // test("should throw an Error for header exceeding 30 characters", async () => { // const tool = await QuestionTool.init()