From 4f9c84678d314b9493af544aa2f8427e055072ac Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Tue, 24 Feb 2026 03:01:37 +0100 Subject: [PATCH 01/10] Add transition scheduling logic for phase-based advancement Implement date-based transition creation, update, and processing for decision process phases. Key changes: - Rewrite createTransitionsForProcess with date-based advancement filtering - Rewrite updateTransitionsForProcess with parallel operations - Refactor transitionMonitor for batch processing and deferred state updates - Move transition creation to publish time instead of instance creation - Add ScheduledTransition type and new exports --- .../decision/createInstanceFromTemplate.ts | 19 +- .../decision/createTransitionsForProcess.ts | 136 ++++++--- .../common/src/services/decision/index.ts | 6 + .../src/services/decision/transitionEngine.ts | 6 - .../services/decision/transitionMonitor.ts | 263 +++++++++++------- .../common/src/services/decision/types.ts | 7 + .../decision/updateDecisionInstance.ts | 12 +- .../decision/updateTransitionsForProcess.ts | 199 +++++++------ 8 files changed, 395 insertions(+), 253 deletions(-) diff --git a/packages/common/src/services/decision/createInstanceFromTemplate.ts b/packages/common/src/services/decision/createInstanceFromTemplate.ts index 09e727926..9664bf4c0 100644 --- a/packages/common/src/services/decision/createInstanceFromTemplate.ts +++ b/packages/common/src/services/decision/createInstanceFromTemplate.ts @@ -12,7 +12,6 @@ import type { User } from '@op/supabase/lib'; import { CommonError, UnauthorizedError } from '../../utils'; import { assertUserByAuthId } from '../assert'; import { generateUniqueProfileSlug } from '../profile/utils'; -import { createTransitionsForProcess } from './createTransitionsForProcess'; import { createDecisionRole } from './decisionRoles'; import { getTemplate } from './getTemplate'; import { @@ -186,22 +185,8 @@ export const createInstanceFromTemplateCore = async ({ return newInstance; }); - // Create scheduled transitions for phases that have date-based advancement AND actual dates set - const hasScheduledDatePhases = instanceData.phases.some( - (phase) => phase.rules?.advancement?.method === 'date' && phase.startDate, - ); - - if (hasScheduledDatePhases) { - try { - await createTransitionsForProcess({ processInstance: instance }); - } catch (error) { - // Log but don't fail instance creation if transitions can't be created - console.error( - 'Failed to create transitions for process instance:', - error, - ); - } - } + // Note: Transitions are NOT created here because the instance is created as DRAFT. + // Transitions are created when the instance is published via updateDecisionInstance. // Fetch the profile with processInstance joined for the response // profileId is guaranteed to be set since we just created it above diff --git a/packages/common/src/services/decision/createTransitionsForProcess.ts b/packages/common/src/services/decision/createTransitionsForProcess.ts index 4f2de85ec..8ba0ec297 100644 --- a/packages/common/src/services/decision/createTransitionsForProcess.ts +++ b/packages/common/src/services/decision/createTransitionsForProcess.ts @@ -1,32 +1,42 @@ -import { db } from '@op/db/client'; -import { decisionProcessTransitions } from '@op/db/schema'; +import { type TransactionType, db, eq } from '@op/db/client'; +import { decisionProcessTransitions, decisionProcesses } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; import { CommonError } from '../../utils'; -import type { InstanceData, PhaseConfiguration } from './types'; +import type { DecisionInstanceData } from './schemas/instanceData'; +import type { + DecisionSchemaDefinition, + PhaseDefinition, +} from './schemas/types'; +import type { ScheduledTransition } from './types'; -export interface CreateTransitionsInput { +/** + * Creates scheduled transition records for phases with date-based advancement. + * Each transition fires when the current phase's end date arrives. + * + * Note: This function looks up rules from the process schema (template) since + * instance data phases may not include rules when created via API. + */ +export async function createTransitionsForProcess({ + processInstance, + tx, +}: { processInstance: ProcessInstance; -} - -export interface CreateTransitionsResult { + tx?: TransactionType; +}): Promise<{ transitions: Array<{ id: string; fromStateId: string | null; toStateId: string; scheduledDate: Date; }>; -} +}> { + const dbClient = tx ?? db; -/** - * Creates transition records for all phases in a process instance. - * Each transition represents the end of one phase and the start of the next. - */ -export async function createTransitionsForProcess({ - processInstance, -}: CreateTransitionsInput): Promise { try { - const instanceData = processInstance.instanceData as InstanceData; + // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, + // but this function is only called for new DecisionInstanceData processes + const instanceData = processInstance.instanceData as DecisionInstanceData; const phases = instanceData.phases; if (!phases || phases.length === 0) { @@ -35,40 +45,80 @@ export async function createTransitionsForProcess({ ); } - const transitionsToCreate = phases.flatMap( - (phase: PhaseConfiguration, index: number) => { - const scheduledDate = phase.endDate ?? phase.startDate; - - // Skip phases that have no dates yet — they don't need transitions - if (!scheduledDate) { - return []; - } - - const fromStateId = index > 0 ? phases[index - 1]?.phaseId : null; - const toStateId = phase.phaseId; - - return [ - { - processInstanceId: processInstance.id, - fromStateId, - toStateId, - scheduledDate: new Date(scheduledDate).toISOString(), - }, - ]; - }, + // Fetch the process schema to get phase rules (instance data may not include rules) + const process = await dbClient._query.decisionProcesses.findFirst({ + where: eq(decisionProcesses.id, processInstance.processId), + }); + + if (!process) { + throw new CommonError( + `Process not found for instance: ${processInstance.id}`, + ); + } + + const processSchema = process.processSchema as DecisionSchemaDefinition; + const schemaPhases = processSchema?.phases || []; + + // Build a map of phase ID to schema phase for quick lookup + const schemaPhasesMap = new Map( + schemaPhases.map((phase) => [phase.id, phase]), ); - const createdTransitions = await db + // Create transitions for phases that use date-based advancement + // A transition is created FROM a phase (when it ends) TO the next phase + const transitionsToCreate: ScheduledTransition[] = []; + + phases.forEach((currentPhase, index) => { + const nextPhase = phases[index + 1]; + // Skip last phase (no next phase to transition to) + if (!nextPhase) { + return; + } + + // Look up rules from instance data first, then fall back to schema + const schemaPhase = schemaPhasesMap.get(currentPhase.phaseId); + const advancementMethod = + currentPhase.rules?.advancement?.method ?? + schemaPhase?.rules?.advancement?.method; + + // Only create transition if current phase uses date-based advancement + if (advancementMethod !== 'date') { + return; + } + + // Schedule transition when the current phase ends + const scheduledDate = currentPhase.endDate; + + if (!scheduledDate) { + throw new CommonError( + `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, + ); + } + + // DB columns are named fromStateId/toStateId but store phase IDs + transitionsToCreate.push({ + processInstanceId: processInstance.id, + fromStateId: currentPhase.phaseId, + toStateId: nextPhase.phaseId, + scheduledDate: new Date(scheduledDate).toISOString(), + }); + }); + + if (transitionsToCreate.length === 0) { + return { transitions: [] }; + } + + const createdTransitions = await dbClient .insert(decisionProcessTransitions) .values(transitionsToCreate) .returning(); return { - transitions: createdTransitions.map((t) => ({ - id: t.id, - fromStateId: t.fromStateId, - toStateId: t.toStateId, - scheduledDate: new Date(t.scheduledDate), + transitions: createdTransitions.map((transition) => ({ + id: transition.id, + fromStateId: transition.fromStateId, + toStateId: transition.toStateId, + scheduledDate: new Date(transition.scheduledDate), })), }; } catch (error) { diff --git a/packages/common/src/services/decision/index.ts b/packages/common/src/services/decision/index.ts index 56a1c0c1c..e7e4563f3 100644 --- a/packages/common/src/services/decision/index.ts +++ b/packages/common/src/services/decision/index.ts @@ -78,3 +78,9 @@ export type { DecisionInstanceData, PhaseInstanceData, } from './schemas/instanceData'; +export type { + DecisionSchemaDefinition, + PhaseDefinition, + PhaseRules, + ProcessConfig, +} from './schemas/types'; diff --git a/packages/common/src/services/decision/transitionEngine.ts b/packages/common/src/services/decision/transitionEngine.ts index 105c1e0b6..04c91dee6 100644 --- a/packages/common/src/services/decision/transitionEngine.ts +++ b/packages/common/src/services/decision/transitionEngine.ts @@ -73,12 +73,6 @@ export class TransitionEngine { const process = instance.process as any; const processSchema = process.processSchema as ProcessSchema; const instanceData = instance.instanceData as InstanceData; - console.log( - 'TRANSITION', - instanceData.currentPhaseId, - instance.currentStateId, - instanceData, - ); const currentStateId = instanceData.currentPhaseId || instance.currentStateId || ''; diff --git a/packages/common/src/services/decision/transitionMonitor.ts b/packages/common/src/services/decision/transitionMonitor.ts index fc436f6af..c9cc70f6b 100644 --- a/packages/common/src/services/decision/transitionMonitor.ts +++ b/packages/common/src/services/decision/transitionMonitor.ts @@ -1,14 +1,31 @@ -import { db, eq, lte, sql } from '@op/db/client'; +import { and, db, eq, isNull, lte, sql } from '@op/db/client'; import { + type DecisionProcess, + type DecisionProcessTransition, + type ProcessInstance, + ProcessStatus, decisionProcessTransitions, - decisionProcesses, processInstances, } from '@op/db/schema'; import pMap from 'p-map'; import { CommonError } from '../../utils'; -// import { processResults } from './processResults'; -import type { ProcessSchema, StateDefinition } from './types'; +import { processResults } from './processResults'; +import type { DecisionInstanceData } from './schemas/instanceData'; + +/** Transition with nested process instance and process relations */ +type TransitionWithRelations = DecisionProcessTransition & { + processInstance: ProcessInstance & { + process: DecisionProcess; + }; +}; + +/** Result of processing a single transition */ +interface ProcessedTransition { + toStateId: string; + isTransitioningToFinalState: boolean; + processInstanceId: string; +} export interface ProcessDecisionsTransitionsResult { processed: number; @@ -33,14 +50,30 @@ export async function processDecisionsTransitions(): Promise + // Query due transitions, filtering to only include published instances + // Draft, completed, and cancelled instances should not have their transitions processed + const dueTransitions = await db + .select({ + id: decisionProcessTransitions.id, + processInstanceId: decisionProcessTransitions.processInstanceId, + fromStateId: decisionProcessTransitions.fromStateId, + toStateId: decisionProcessTransitions.toStateId, + scheduledDate: decisionProcessTransitions.scheduledDate, + completedAt: decisionProcessTransitions.completedAt, + }) + .from(decisionProcessTransitions) + .innerJoin( + processInstances, + eq(decisionProcessTransitions.processInstanceId, processInstances.id), + ) + .where( and( - isNull(transitions.completedAt), - lte(transitions.scheduledDate, now), + isNull(decisionProcessTransitions.completedAt), + lte(decisionProcessTransitions.scheduledDate, now), + eq(processInstances.status, ProcessStatus.PUBLISHED), ), - orderBy: (transitions, { asc }) => [asc(transitions.scheduledDate)], - }); + ) + .orderBy(decisionProcessTransitions.scheduledDate); // Group transitions by processInstanceId to avoid race conditions // within the same process instance @@ -57,16 +90,18 @@ export async function processDecisionsTransitions(): Promise { + async ([processInstanceId, transitions]) => { + let lastSuccessfulTransition: ProcessedTransition | null = null; + // Process this process's transitions sequentially for (const transition of transitions) { try { - await processTransition(transition.id); - + // Process transition (marks completedAt, returns state info) + lastSuccessfulTransition = await processTransition(transition.id); result.processed++; } catch (error) { result.failed++; @@ -81,10 +116,52 @@ export async function processDecisionsTransitions(): Promise { - const transition = await db._query.decisionProcessTransitions.findFirst({ - where: eq(decisionProcessTransitions.id, transitionId), - }); +async function processTransition( + transitionId: string, +): Promise { + // Fetch transition with related process instance and process in a single query + const transitionResult = await db._query.decisionProcessTransitions.findFirst( + { + where: eq(decisionProcessTransitions.id, transitionId), + with: { + processInstance: { + with: { + process: true, + }, + }, + }, + }, + ); - if (!transition) { + if (!transitionResult) { throw new CommonError( `Transition not found: ${transitionId}. It may have been deleted or the ID is invalid.`, ); } - if (transition.completedAt) { - return; - } - - // Get the process instance to check if we're transitioning to a final state - const processInstance = await db._query.processInstances.findFirst({ - where: eq(processInstances.id, transition.processInstanceId), - }); + // Type assertion for the nested relations (Drizzle's type inference doesn't handle nested `with` well) + const transition = transitionResult as TransitionWithRelations; + const processInstance = transition.processInstance; if (!processInstance) { throw new CommonError( `Process instance not found: ${transition.processInstanceId}`, ); } - // Get the process schema to check the state type - const process = await db._query.decisionProcesses.findFirst({ - where: eq(decisionProcesses.id, processInstance.processId), - }); - + const process = processInstance.process; if (!process) { throw new CommonError(`Process not found: ${processInstance.processId}`); } - const processSchema = process.processSchema as ProcessSchema; - const toState = processSchema.states.find( - (state: StateDefinition) => state.id === transition.toStateId, - ); + // Determine if transitioning to final state using instanceData.phases + // In the new schema format, the last phase is always the final state + const instanceData = processInstance.instanceData as DecisionInstanceData; + const phases = instanceData.phases; + const lastPhaseId = phases[phases.length - 1]?.phaseId; + const isTransitioningToFinalState = transition.toStateId === lastPhaseId; - const isTransitioningToFinalState = toState?.type === 'final'; - - // Update both the process instance and transition in a single transaction - // to ensure atomicity and prevent partial state updates - // Note: We rely on foreign key constraints to ensure the process instance exists - await db.transaction(async (tx) => { - // Update the process instance to the new state - // Use jsonb_set to update only the currentStateId field without reading the entire instanceData - await tx - .update(processInstances) - .set({ - currentStateId: transition.toStateId, - instanceData: sql`jsonb_set( - ${processInstances.instanceData}, - '{currentStateId}', - to_jsonb(${transition.toStateId}::text) - )`, - }) - .where(eq(processInstances.id, transition.processInstanceId)); + // Only mark the transition as completed (state update happens after all transitions) + await db + .update(decisionProcessTransitions) + .set({ + completedAt: new Date().toISOString(), + }) + .where(eq(decisionProcessTransitions.id, transitionId)); - // Mark the transition as completed - await tx - .update(decisionProcessTransitions) - .set({ - completedAt: new Date().toISOString(), - }) - .where(eq(decisionProcessTransitions.id, transition.id)); - }); - - // If transitioning to a final state (results phase), process the results - // This is done AFTER the transition is marked complete to ensure the state change - // is committed even if results processing fails - if (isTransitioningToFinalState) { - try { - console.log( - `Processing results for process instance ${transition.processInstanceId}`, - ); - console.log('RESULT PROCESSING DISABLED'); - // Disabling for now as none are defined in production yet. We need to migrate current processes first. - /* - const result = await processResults({ - processInstanceId: transition.processInstanceId, - }); - - if (!result.success) { - console.error( - `Results processing failed for process instance ${transition.processInstanceId}:`, - result.error, - ); - } else { - console.log( - `Results processed successfully for process instance ${transition.processInstanceId}. Selected ${result.selectedProposalIds.length} proposals.`, - ); - } - */ - } catch (error) { - // Log the error but don't fail the transition - // The transition to the results phase has already been completed - console.error( - `Error processing results for process instance ${transition.processInstanceId}:`, - error, - ); - } - } + // Return info needed for state update + return { + toStateId: transition.toStateId, + isTransitioningToFinalState, + processInstanceId: transition.processInstanceId, + }; +} + +/** + * Updates the process instance state to the specified state. + * Called after all transitions for an instance have been processed. + */ +async function updateInstanceState( + processInstanceId: string, + toStateId: string, +): Promise { + await db + .update(processInstances) + .set({ + currentStateId: toStateId, + instanceData: sql`jsonb_set( + ${processInstances.instanceData}, + '{currentPhaseId}', + to_jsonb(${toStateId}::text) + )`, + }) + .where(eq(processInstances.id, processInstanceId)); } diff --git a/packages/common/src/services/decision/types.ts b/packages/common/src/services/decision/types.ts index 2eb7c8594..ceb1b428d 100644 --- a/packages/common/src/services/decision/types.ts +++ b/packages/common/src/services/decision/types.ts @@ -1,3 +1,4 @@ +import type { DecisionProcessTransition } from '@op/db/schema'; import type { JSONSchema7 } from 'json-schema'; import type { SelectionPipeline } from './selectionPipeline/types'; @@ -195,3 +196,9 @@ export interface DecisionData { // Decision content should match the decisionDefinition schema [key: string]: unknown; } + +/** A scheduled phase transition (fromStateId/toStateId store phase IDs) */ +export type ScheduledTransition = Pick< + DecisionProcessTransition, + 'processInstanceId' | 'fromStateId' | 'toStateId' | 'scheduledDate' +>; diff --git a/packages/common/src/services/decision/updateDecisionInstance.ts b/packages/common/src/services/decision/updateDecisionInstance.ts index e0aba7f10..81efbaea1 100644 --- a/packages/common/src/services/decision/updateDecisionInstance.ts +++ b/packages/common/src/services/decision/updateDecisionInstance.ts @@ -10,6 +10,7 @@ import { assertAccess, permission } from 'access-zones'; import { CommonError, NotFoundError } from '../../utils'; import { getProfileAccessUser } from '../access'; +import { createTransitionsForProcess } from './createTransitionsForProcess'; import { schemaValidator } from './schemaValidator'; import type { DecisionInstanceData, @@ -212,14 +213,23 @@ export const updateDecisionInstance = async ({ // Determine the final status (updated or existing) const finalStatus = status ?? existingInstance.status; + const isBeingPublished = + status === ProcessStatus.PUBLISHED && + existingInstance.status === ProcessStatus.DRAFT; // If status is DRAFT, remove all transitions if (finalStatus === ProcessStatus.DRAFT) { await tx .delete(decisionProcessTransitions) .where(eq(decisionProcessTransitions.processInstanceId, instanceId)); + } else if (isBeingPublished) { + // When publishing a draft, create transitions for all date-based phases + await createTransitionsForProcess({ + processInstance: updatedInstance, + tx, + }); } else if (phases && phases.length > 0) { - // If phases were updated and not DRAFT, update the corresponding transitions + // If phases were updated and already published, update the corresponding transitions await updateTransitionsForProcess({ processInstance: updatedInstance, tx, diff --git a/packages/common/src/services/decision/updateTransitionsForProcess.ts b/packages/common/src/services/decision/updateTransitionsForProcess.ts index 7e2cf33a7..833edfc09 100644 --- a/packages/common/src/services/decision/updateTransitionsForProcess.ts +++ b/packages/common/src/services/decision/updateTransitionsForProcess.ts @@ -4,7 +4,8 @@ import type { ProcessInstance } from '@op/db/schema'; import pMap from 'p-map'; import { CommonError } from '../../utils'; -import type { InstanceData, PhaseConfiguration } from './types'; +import type { DecisionInstanceData } from './schemas/instanceData'; +import type { ScheduledTransition } from './types'; export interface UpdateTransitionsInput { processInstance: ProcessInstance; @@ -19,10 +20,11 @@ export interface UpdateTransitionsResult { /** * Updates transition records for a process instance when phase dates change. + * Only handles phases with date-based advancement (rules.advancement.method === 'date'). * This function: * - Updates existing transitions with new scheduled dates - * - Creates new transitions for newly added phases - * - Deletes transitions for removed phases + * - Creates new transitions for newly added date-based phases + * - Deletes transitions for phases that no longer use date-based advancement * - Prevents updates to completed transitions (results phase is locked) */ export async function updateTransitionsForProcess({ @@ -32,7 +34,9 @@ export async function updateTransitionsForProcess({ const dbClient = tx ?? db; try { - const instanceData = processInstance.instanceData as InstanceData; + // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, + // but this function is only called for new DecisionInstanceData processes + const instanceData = processInstance.instanceData as DecisionInstanceData; const phases = instanceData.phases; if (!phases || phases.length === 0) { @@ -57,95 +61,122 @@ export async function updateTransitionsForProcess({ deleted: 0, }; - // Build a map of expected transitions from the current phases - const expectedTransitions = phases.flatMap( - (phase: PhaseConfiguration, index: number) => { - const scheduledDate = phase.endDate ?? phase.startDate; - - // Skip phases that have no dates yet — they don't need transitions - if (!scheduledDate) { - return []; - } - - const fromStateId = index > 0 ? phases[index - 1]?.phaseId : null; - const toStateId = phase.phaseId; - - return [ - { - fromStateId, - toStateId, - scheduledDate: new Date(scheduledDate).toISOString(), - }, - ]; - }, - ); - - // Process each expected transition in parallel - const updateResults = await pMap( - expectedTransitions, - async (expected) => { - // Find matching existing transition by toStateId - const existing = existingTransitions.find( - (t) => t.toStateId === expected.toStateId, - ); + // Build expected transitions for phases with date-based advancement + // A transition is created FROM a phase (when it ends) TO the next phase + const expectedTransitions: ScheduledTransition[] = []; - if (existing) { - // If transition is already completed, don't update it (results phase is locked) - if (existing.completedAt) { - return { action: 'skipped' as const }; - } - - // Update the scheduled date if it changed - if (existing.scheduledDate !== expected.scheduledDate) { - await dbClient - .update(decisionProcessTransitions) - .set({ - scheduledDate: expected.scheduledDate, - }) - .where(eq(decisionProcessTransitions.id, existing.id)); - - return { action: 'updated' as const }; - } + phases.forEach((currentPhase, index) => { + const nextPhase = phases[index + 1]; + // Skip last phase (no next phase to transition to) + if (!nextPhase) { + return; + } - return { action: 'unchanged' as const }; - } else { - // Create new transition for this phase - await dbClient.insert(decisionProcessTransitions).values({ - processInstanceId: processInstance.id, - fromStateId: expected.fromStateId, - toStateId: expected.toStateId, - scheduledDate: expected.scheduledDate, - }); - - return { action: 'created' as const }; - } - }, - { concurrency: 5 }, - ); + // Only create transition if current phase uses date-based advancement + if (currentPhase.rules?.advancement?.method !== 'date') { + return; + } - // Aggregate results - result.updated = updateResults.filter((r) => r.action === 'updated').length; - result.created = updateResults.filter((r) => r.action === 'created').length; + // Schedule transition when the current phase ends + const scheduledDate = currentPhase.endDate; - // Delete transitions that are no longer in the phases list - // But only delete uncompleted transitions - const expectedStateIds = new Set( - expectedTransitions.map((t) => t.toStateId), + if (!scheduledDate) { + throw new CommonError( + `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, + ); + } + + // DB columns are named fromStateId/toStateId but store phase IDs + expectedTransitions.push({ + processInstanceId: processInstance.id, + fromStateId: currentPhase.phaseId, + toStateId: nextPhase.phaseId, + scheduledDate: new Date(scheduledDate).toISOString(), + }); + }); + + // Calculate transitions to delete upfront (those not in expected set and not completed) + // Use composite key (fromStateId:toStateId) to match both fields + const expectedTransitionKeys = new Set( + expectedTransitions.map( + (transition) => `${transition.fromStateId}:${transition.toStateId}`, + ), ); const transitionsToDelete = existingTransitions.filter( - (t) => !expectedStateIds.has(t.toStateId) && !t.completedAt, + (transition) => + !expectedTransitionKeys.has( + `${transition.fromStateId}:${transition.toStateId}`, + ) && !transition.completedAt, ); - await pMap( - transitionsToDelete, - async (transition) => { - await dbClient - .delete(decisionProcessTransitions) - .where(eq(decisionProcessTransitions.id, transition.id)); - }, - { concurrency: 5 }, - ); + // Run update/create and delete operations in parallel since they operate on mutually exclusive sets + const [updateResults] = await Promise.all([ + // Update existing transitions or create new ones + pMap( + expectedTransitions, + async (expected) => { + // Find matching existing transition by both fromStateId and toStateId + const existing = existingTransitions.find( + (transition) => + transition.fromStateId === expected.fromStateId && + transition.toStateId === expected.toStateId, + ); + + if (existing) { + // If transition is already completed, don't update it (results phase is locked) + if (existing.completedAt) { + return { action: 'skipped' as const }; + } + + // Update the scheduled date if it changed (compare as timestamps to handle format differences) + if ( + new Date(existing.scheduledDate).getTime() !== + new Date(expected.scheduledDate).getTime() + ) { + await dbClient + .update(decisionProcessTransitions) + .set({ + scheduledDate: expected.scheduledDate, + }) + .where(eq(decisionProcessTransitions.id, existing.id)); + + return { action: 'updated' as const }; + } + + return { action: 'unchanged' as const }; + } else { + // Create new transition for this phase + await dbClient.insert(decisionProcessTransitions).values({ + processInstanceId: expected.processInstanceId, + fromStateId: expected.fromStateId, + toStateId: expected.toStateId, + scheduledDate: expected.scheduledDate, + }); + + return { action: 'created' as const }; + } + }, + { concurrency: 5 }, + ), + // Delete transitions that are no longer in the expected set + pMap( + transitionsToDelete, + async (transition) => { + await dbClient + .delete(decisionProcessTransitions) + .where(eq(decisionProcessTransitions.id, transition.id)); + }, + { concurrency: 5 }, + ), + ]); + // Aggregate results + result.updated = updateResults.filter( + (update) => update.action === 'updated', + ).length; + result.created = updateResults.filter( + (update) => update.action === 'created', + ).length; result.deleted = transitionsToDelete.length; return result; From 0ce7fe563f4adf827538650ca7c27d0b1e22915a Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Tue, 24 Feb 2026 03:27:50 +0100 Subject: [PATCH 02/10] Update createInstanceFromTemplate test for DRAFT transition behavior Transitions are no longer created on instance creation (DRAFT status). They are created when the instance is published. --- .../createInstanceFromTemplate.test.ts | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts b/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts index 2c0602eae..d852f197e 100644 --- a/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts +++ b/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts @@ -102,7 +102,7 @@ describe.concurrent('createInstanceFromTemplate', () => { expect(instanceData.currentPhaseId).toBe('submission'); }); - it('should create transitions for phases with date-based advancement', async ({ + it('should NOT create transitions for DRAFT instances (transitions are created on publish)', async ({ task, onTestFinished, }) => { @@ -152,37 +152,18 @@ describe.concurrent('createInstanceFromTemplate', () => { testData.trackProfileForCleanup(result.id); expect(result.processInstance.id).toBeDefined(); + expect(result.processInstance.status).toBe('draft'); - // Verify transitions were created + // Verify NO transitions were created for DRAFT instance + // Transitions are only created when the instance is published const transitions = await db._query.decisionProcessTransitions.findMany({ where: eq( decisionProcessTransitions.processInstanceId, result.processInstance.id, ), - orderBy: (transitions, { asc }) => [asc(transitions.scheduledDate)], }); - // The simple template has 4 phases with date-based advancement - // Each phase that has a date-based advancement gets a transition created - expect(transitions.length).toBeGreaterThanOrEqual(3); - - // Verify transition details - check that key transitions exist - const submissionToReview = transitions.find( - (t) => t.fromStateId === 'submission' && t.toStateId === 'review', - ); - const reviewToVoting = transitions.find( - (t) => t.fromStateId === 'review' && t.toStateId === 'voting', - ); - const votingToResults = transitions.find( - (t) => t.fromStateId === 'voting' && t.toStateId === 'results', - ); - - expect(submissionToReview).toBeDefined(); - expect(submissionToReview!.completedAt).toBeNull(); - - expect(reviewToVoting).toBeDefined(); - - expect(votingToResults).toBeDefined(); + expect(transitions.length).toBe(0); }); it('should accept phase settings', async ({ task, onTestFinished }) => { From 785dceb5405e520cd419581a933b63e0fb279520 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Tue, 24 Feb 2026 03:41:42 +0100 Subject: [PATCH 03/10] Migrate to db.query (v2 relations) where supported Use v2 relational query API in createInstanceFromTemplate and updateDecisionInstance. transitionEngine and transitionMonitor remain on db._query as their relations are not yet in v2. --- .../services/decision/createInstanceFromTemplate.ts | 6 +++--- .../src/services/decision/updateDecisionInstance.ts | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/common/src/services/decision/createInstanceFromTemplate.ts b/packages/common/src/services/decision/createInstanceFromTemplate.ts index 9664bf4c0..7c067bb9d 100644 --- a/packages/common/src/services/decision/createInstanceFromTemplate.ts +++ b/packages/common/src/services/decision/createInstanceFromTemplate.ts @@ -1,4 +1,4 @@ -import { db, eq } from '@op/db/client'; +import { db } from '@op/db/client'; import { EntityType, ProcessStatus, @@ -190,8 +190,8 @@ export const createInstanceFromTemplateCore = async ({ // Fetch the profile with processInstance joined for the response // profileId is guaranteed to be set since we just created it above - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, instance.profileId!), + const profile = await db.query.profiles.findFirst({ + where: { id: instance.profileId! }, with: { processInstance: true, }, diff --git a/packages/common/src/services/decision/updateDecisionInstance.ts b/packages/common/src/services/decision/updateDecisionInstance.ts index 81efbaea1..bf66f7e3a 100644 --- a/packages/common/src/services/decision/updateDecisionInstance.ts +++ b/packages/common/src/services/decision/updateDecisionInstance.ts @@ -50,8 +50,8 @@ export const updateDecisionInstance = async ({ user: User; }) => { // Fetch existing instance - const existingInstance = await db._query.processInstances.findFirst({ - where: eq(processInstances.id, instanceId), + const existingInstance = await db.query.processInstances.findFirst({ + where: { id: instanceId }, }); if (!existingInstance) { @@ -179,8 +179,8 @@ export const updateDecisionInstance = async ({ // Only update if there's something to update if (Object.keys(updateData).length === 0) { // Nothing to update, just return the existing profile - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, profileId), + const profile = await db.query.profiles.findFirst({ + where: { id: profileId }, with: { processInstance: true, }, @@ -238,8 +238,8 @@ export const updateDecisionInstance = async ({ }); // Fetch the profile with processInstance joined for the response - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, profileId), + const profile = await db.query.profiles.findFirst({ + where: { id: profileId }, with: { processInstance: true, }, From d279ad9ad9016bb97cff29ad447058ee94293bc5 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Tue, 24 Feb 2026 03:46:15 +0100 Subject: [PATCH 04/10] Read phase rules from instance data instead of schema lookup Instance data phases always contain rules (copied from template at creation time), so the extra process schema fetch is unnecessary. Removes the schema lookup and reads advancement rules directly from instanceData.phases[].rules. --- .../decision/createTransitionsForProcess.ts | 39 +++---------------- 1 file changed, 5 insertions(+), 34 deletions(-) diff --git a/packages/common/src/services/decision/createTransitionsForProcess.ts b/packages/common/src/services/decision/createTransitionsForProcess.ts index 8ba0ec297..cb8da08e6 100644 --- a/packages/common/src/services/decision/createTransitionsForProcess.ts +++ b/packages/common/src/services/decision/createTransitionsForProcess.ts @@ -1,21 +1,17 @@ -import { type TransactionType, db, eq } from '@op/db/client'; -import { decisionProcessTransitions, decisionProcesses } from '@op/db/schema'; +import { type TransactionType, db } from '@op/db/client'; +import { decisionProcessTransitions } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; import { CommonError } from '../../utils'; import type { DecisionInstanceData } from './schemas/instanceData'; -import type { - DecisionSchemaDefinition, - PhaseDefinition, -} from './schemas/types'; import type { ScheduledTransition } from './types'; /** * Creates scheduled transition records for phases with date-based advancement. * Each transition fires when the current phase's end date arrives. * - * Note: This function looks up rules from the process schema (template) since - * instance data phases may not include rules when created via API. + * Rules are read from the instance's phase data (instanceData.phases[].rules), + * which are populated from the template when the instance is created. */ export async function createTransitionsForProcess({ processInstance, @@ -45,25 +41,6 @@ export async function createTransitionsForProcess({ ); } - // Fetch the process schema to get phase rules (instance data may not include rules) - const process = await dbClient._query.decisionProcesses.findFirst({ - where: eq(decisionProcesses.id, processInstance.processId), - }); - - if (!process) { - throw new CommonError( - `Process not found for instance: ${processInstance.id}`, - ); - } - - const processSchema = process.processSchema as DecisionSchemaDefinition; - const schemaPhases = processSchema?.phases || []; - - // Build a map of phase ID to schema phase for quick lookup - const schemaPhasesMap = new Map( - schemaPhases.map((phase) => [phase.id, phase]), - ); - // Create transitions for phases that use date-based advancement // A transition is created FROM a phase (when it ends) TO the next phase const transitionsToCreate: ScheduledTransition[] = []; @@ -75,14 +52,8 @@ export async function createTransitionsForProcess({ return; } - // Look up rules from instance data first, then fall back to schema - const schemaPhase = schemaPhasesMap.get(currentPhase.phaseId); - const advancementMethod = - currentPhase.rules?.advancement?.method ?? - schemaPhase?.rules?.advancement?.method; - // Only create transition if current phase uses date-based advancement - if (advancementMethod !== 'date') { + if (currentPhase.rules?.advancement?.method !== 'date') { return; } From 90db95e8b7713277eb887df4b89e74464b1fa3ca Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Mon, 2 Mar 2026 13:02:22 +0100 Subject: [PATCH 05/10] Remove unused process inclusion --- .../services/decision/transitionMonitor.ts | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/packages/common/src/services/decision/transitionMonitor.ts b/packages/common/src/services/decision/transitionMonitor.ts index c9cc70f6b..842467c0f 100644 --- a/packages/common/src/services/decision/transitionMonitor.ts +++ b/packages/common/src/services/decision/transitionMonitor.ts @@ -1,6 +1,5 @@ import { and, db, eq, isNull, lte, sql } from '@op/db/client'; import { - type DecisionProcess, type DecisionProcessTransition, type ProcessInstance, ProcessStatus, @@ -13,11 +12,9 @@ import { CommonError } from '../../utils'; import { processResults } from './processResults'; import type { DecisionInstanceData } from './schemas/instanceData'; -/** Transition with nested process instance and process relations */ +/** Transition with nested process instance relation */ type TransitionWithRelations = DecisionProcessTransition & { - processInstance: ProcessInstance & { - process: DecisionProcess; - }; + processInstance: ProcessInstance; }; /** Result of processing a single transition */ @@ -122,7 +119,11 @@ export async function processDecisionsTransitions(): Promise { - // Fetch transition with related process instance and process in a single query + // Fetch transition with related process instance in a single query const transitionResult = await db._query.decisionProcessTransitions.findFirst( { where: eq(decisionProcessTransitions.id, transitionId), with: { - processInstance: { - with: { - process: true, - }, - }, + processInstance: true, }, }, ); @@ -217,16 +214,19 @@ async function processTransition( ); } - const process = processInstance.process; - if (!process) { - throw new CommonError(`Process not found: ${processInstance.processId}`); - } - // Determine if transitioning to final state using instanceData.phases // In the new schema format, the last phase is always the final state const instanceData = processInstance.instanceData as DecisionInstanceData; const phases = instanceData.phases; - const lastPhaseId = phases[phases.length - 1]?.phaseId; + + if (!phases || phases.length === 0) { + throw new CommonError( + `Process instance ${processInstance.id} has no phases defined in instanceData`, + ); + } + + // Safe to assert non-null after the length check above + const lastPhaseId = phases[phases.length - 1]!.phaseId; const isTransitioningToFinalState = transition.toStateId === lastPhaseId; // Only mark the transition as completed (state update happens after all transitions) From f91f2400c7487e7a842b65ed33db1f669acf3f24 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Mon, 2 Mar 2026 16:45:19 +0100 Subject: [PATCH 06/10] Add transitionEngine tests --- .../instances/transitionEngine.test.ts | 341 +++++++++++ .../instances/transitionMonitor.test.ts | 533 ++++++++++++++++++ 2 files changed, 874 insertions(+) create mode 100644 services/api/src/routers/decision/instances/transitionEngine.test.ts create mode 100644 services/api/src/routers/decision/instances/transitionMonitor.test.ts diff --git a/services/api/src/routers/decision/instances/transitionEngine.test.ts b/services/api/src/routers/decision/instances/transitionEngine.test.ts new file mode 100644 index 000000000..e38048a6d --- /dev/null +++ b/services/api/src/routers/decision/instances/transitionEngine.test.ts @@ -0,0 +1,341 @@ +import { TransitionEngine, simpleVoting } from '@op/common'; +import { db, eq } from '@op/db/client'; +import { + ProcessStatus, + decisionProcesses, + processInstances, + stateTransitionHistory, + users, +} from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +import { appRouter } from '../..'; +import { TestDecisionsDataManager } from '../../../test/helpers/TestDecisionsDataManager'; +import { + createIsolatedSession, + createTestContextWithSession, +} from '../../../test/supabase-utils'; +import { createCallerFactory } from '../../../trpcFactory'; + +const createCaller = createCallerFactory(appRouter); + +async function createAuthenticatedCaller(email: string) { + const { session } = await createIsolatedSession(email); + return createCaller(await createTestContextWithSession(session)); +} + +/** + * Legacy process schema format with explicit states and transitions arrays. + * The TransitionEngine reads `processSchema.transitions` from the process record. + */ +const legacyProcessSchema = { + name: 'Legacy Process', + description: 'A process schema with explicit transitions for testing', + states: [ + { id: 'submission', name: 'Submission', type: 'initial' }, + { id: 'review', name: 'Review', type: 'intermediate' }, + { id: 'voting', name: 'Voting', type: 'intermediate' }, + { id: 'results', name: 'Results', type: 'final' }, + ], + transitions: [ + { + id: 'submission-to-review', + name: 'Submit to Review', + from: 'submission', + to: 'review', + rules: { type: 'manual' }, + }, + { + id: 'review-to-voting', + name: 'Review to Voting', + from: 'review', + to: 'voting', + rules: { type: 'manual' }, + }, + { + id: 'voting-to-results', + name: 'Voting to Results', + from: 'voting', + to: 'results', + rules: { type: 'manual' }, + }, + ], + initialState: 'submission', + decisionDefinition: { type: 'object' }, + proposalTemplate: { type: 'object' }, +}; + +/** + * Creates a process template with legacy ProcessSchema format (with transitions array) + * and an instance with proper instanceData for TransitionEngine tests. + */ +async function createLegacyProcessWithInstance( + testData: TestDecisionsDataManager, + taskId: string, + { + currentPhaseId = 'submission', + processSchema = legacyProcessSchema, + }: { + currentPhaseId?: string; + processSchema?: Record; + } = {}, +) { + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + const [userRecord] = await db + .select() + .from(users) + .where(eq(users.email, setup.userEmail)); + + if (!userRecord?.profileId) { + throw new Error('Test user must have a profileId'); + } + + // Create process with legacy schema format + const [process] = await db + .insert(decisionProcesses) + .values({ + name: `Legacy Process ${taskId}`, + description: 'Test process for TransitionEngine', + processSchema, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + const caller = await createAuthenticatedCaller(setup.userEmail); + + // Create a simpleVoting template for createInstanceFromTemplate, then re-link to legacy process + const [simpleTemplate] = await db + .insert(decisionProcesses) + .values({ + name: `Simple Template for Engine ${taskId}`, + description: simpleVoting.description, + processSchema: simpleVoting, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + const instanceResult = await caller.decision.createInstanceFromTemplate({ + templateId: simpleTemplate!.id, + name: `Engine Test ${taskId}`, + }); + + testData.trackProfileForCleanup(instanceResult.id); + + const instanceId = instanceResult.processInstance.id; + + // Re-link the instance to our legacy process and set the desired current state + await db + .update(processInstances) + .set({ + processId: process!.id, + currentStateId: currentPhaseId, + status: ProcessStatus.PUBLISHED, + instanceData: { + currentPhaseId, + phases: [ + { phaseId: 'submission', name: 'Submission' }, + { phaseId: 'review', name: 'Review' }, + { phaseId: 'voting', name: 'Voting' }, + { phaseId: 'results', name: 'Results' }, + ], + }, + }) + .where(eq(processInstances.id, instanceId)); + + return { + instanceId, + processId: process!.id, + user: setup.user, + userEmail: setup.userEmail, + caller, + }; +} + +describe.concurrent('TransitionEngine', () => { + it('should return available transitions from current state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + user, + }); + + expect(result.canTransition).toBe(true); + expect(result.availableTransitions.length).toBeGreaterThanOrEqual(1); + + const reviewTransition = result.availableTransitions.find( + (t) => t.toStateId === 'review', + ); + expect(reviewTransition).toBeDefined(); + expect(reviewTransition!.transitionName).toBe('Submit to Review'); + expect(reviewTransition!.canExecute).toBe(true); + }); + + it('should return no transitions when at final state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'results' }, + ); + + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + user, + }); + + expect(result.canTransition).toBe(false); + expect(result.availableTransitions.length).toBe(0); + }); + + it('should filter transitions by toStateId', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + // Check with a specific toStateId + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'review', + user, + }); + + expect(result.availableTransitions.length).toBe(1); + expect(result.availableTransitions[0]!.toStateId).toBe('review'); + + // Check with a toStateId that doesn't match any transition from current state + const noResult = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'results', + user, + }); + + expect(noResult.canTransition).toBe(false); + expect(noResult.availableTransitions.length).toBe(0); + }); + + it('should execute a valid manual transition', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + const result = await TransitionEngine.executeTransition({ + data: { + instanceId, + toStateId: 'review', + }, + user, + }); + + expect(result).toBeDefined(); + expect(result!.currentStateId).toBe('review'); + + // Verify state transition history was created + const history = await db._query.stateTransitionHistory.findMany({ + where: eq(stateTransitionHistory.processInstanceId, instanceId), + }); + + expect(history.length).toBe(1); + expect(history[0]!.fromStateId).toBe('submission'); + expect(history[0]!.toStateId).toBe('review'); + }); + + it('should reject transition when conditions are not met', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a process schema with a transition that has a proposalCount condition + const schemaWithConditions = { + ...legacyProcessSchema, + transitions: [ + { + id: 'submission-to-review', + name: 'Submit to Review', + from: 'submission', + to: 'review', + rules: { + type: 'manual', + conditions: [ + { + type: 'proposalCount', + operator: 'greaterThan', + value: 0, + }, + ], + requireAll: true, + }, + }, + ...legacyProcessSchema.transitions.slice(1), + ], + }; + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { + currentPhaseId: 'submission', + processSchema: schemaWithConditions, + }, + ); + + // No proposals exist, so the proposalCount > 0 condition should fail + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'review', + user, + }); + + expect(result.availableTransitions.length).toBe(1); + expect(result.availableTransitions[0]!.canExecute).toBe(false); + expect(result.availableTransitions[0]!.failedRules.length).toBeGreaterThan( + 0, + ); + expect(result.canTransition).toBe(false); + }); + + it('should throw NotFoundError for non-existent instance', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + await expect( + TransitionEngine.checkAvailableTransitions({ + instanceId: '00000000-0000-0000-0000-000000000000', + user: setup.user, + }), + ).rejects.toThrow(/not found/i); + }); +}); diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts new file mode 100644 index 000000000..83971d31e --- /dev/null +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -0,0 +1,533 @@ +import { + type DecisionInstanceData, + processDecisionsTransitions, + simpleVoting, +} from '@op/common'; +import { db, eq } from '@op/db/client'; +import { + ProcessStatus, + decisionProcessTransitions, + decisionProcesses, + processInstances, + users, +} from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +import { appRouter } from '../..'; +import { TestDecisionsDataManager } from '../../../test/helpers/TestDecisionsDataManager'; +import { + createIsolatedSession, + createTestContextWithSession, +} from '../../../test/supabase-utils'; +import { createCallerFactory } from '../../../trpcFactory'; + +const createCaller = createCallerFactory(appRouter); + +async function createAuthenticatedCaller(email: string) { + const { session } = await createIsolatedSession(email); + return createCaller(await createTestContextWithSession(session)); +} + +/** + * Helper to create a template using the simpleVoting schema (4 phases, date-based advancement). + * Returns the template ID and user email. + */ +async function createSimpleTemplate( + testData: TestDecisionsDataManager, + taskId: string, +) { + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + const [userRecord] = await db + .select() + .from(users) + .where(eq(users.email, setup.userEmail)); + + if (!userRecord?.profileId) { + throw new Error('Test user must have a profileId'); + } + + const [template] = await db + .insert(decisionProcesses) + .values({ + name: `Simple Template ${taskId}`, + description: simpleVoting.description, + processSchema: simpleVoting, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + return { templateId: template!.id, userEmail: setup.userEmail }; +} + +/** + * Helper to create an instance from a template, publish it, and make transitions due. + * Returns everything needed for transition monitor tests. + */ +async function createPublishedInstanceWithDueTransitions( + testData: TestDecisionsDataManager, + taskId: string, + { + makeDue = true, + phaseDateOffsets, + }: { + makeDue?: boolean; + phaseDateOffsets?: { startOffsetMs: number; endOffsetMs: number }[]; + } = {}, +) { + const { templateId, userEmail } = await createSimpleTemplate( + testData, + taskId, + ); + const caller = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + + // Default: phases spaced 7 days apart in the future + const defaultOffsets = [ + { startOffsetMs: 0, endOffsetMs: 7 * 24 * 60 * 60 * 1000 }, + { + startOffsetMs: 7 * 24 * 60 * 60 * 1000, + endOffsetMs: 14 * 24 * 60 * 60 * 1000, + }, + { + startOffsetMs: 14 * 24 * 60 * 60 * 1000, + endOffsetMs: 21 * 24 * 60 * 60 * 1000, + }, + { startOffsetMs: 21 * 24 * 60 * 60 * 1000, endOffsetMs: 0 }, + ]; + + const offsets = phaseDateOffsets ?? defaultOffsets; + + if (offsets.length !== 4) { + throw new Error( + `phaseDateOffsets must have exactly 4 entries (one per phase), got ${offsets.length}`, + ); + } + + const phases = [ + { + phaseId: 'submission', + startDate: new Date(now.getTime() + offsets[0]!.startOffsetMs).toISOString(), + endDate: new Date(now.getTime() + offsets[0]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'review', + startDate: new Date(now.getTime() + offsets[1]!.startOffsetMs).toISOString(), + endDate: new Date(now.getTime() + offsets[1]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date(now.getTime() + offsets[2]!.startOffsetMs).toISOString(), + endDate: new Date(now.getTime() + offsets[2]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date(now.getTime() + offsets[3]!.startOffsetMs).toISOString(), + }, + ]; + + // Create instance + const result = await caller.decision.createInstanceFromTemplate({ + templateId, + name: `Monitor Test ${taskId}`, + phases, + }); + + testData.trackProfileForCleanup(result.id); + + const instanceId = result.processInstance.id; + + // Publish the instance (this creates transitions) + await caller.decision.updateDecisionInstance({ + instanceId, + status: ProcessStatus.PUBLISHED, + }); + + if (makeDue) { + // Make all transitions due by setting scheduledDate to past times. + // Stagger them to preserve correct ordering (monitor orders by scheduledDate). + const fetchedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + // Sort by the original scheduled date to maintain creation order + fetchedTransitions.sort( + (a, b) => + new Date(a.scheduledDate).getTime() - + new Date(b.scheduledDate).getTime(), + ); + + for (let i = 0; i < fetchedTransitions.length; i++) { + const staggeredPast = new Date( + now.getTime() - (fetchedTransitions.length - i) * 60 * 60 * 1000, + ).toISOString(); + await db + .update(decisionProcessTransitions) + .set({ scheduledDate: staggeredPast }) + .where(eq(decisionProcessTransitions.id, fetchedTransitions[i]!.id)); + } + } + + // Get the transitions + const transitions = await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + return { + instanceId, + transitions, + userEmail, + caller, + templateId, + }; +} + +// Sequential execution: processDecisionsTransitions is a global function that processes +// ALL due transitions, so concurrent tests would race to process each other's transitions. +describe('processDecisionsTransitions', () => { + it('should process a due transition and update instance state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + ); + + const result = await processDecisionsTransitions(); + + expect(result.processed).toBeGreaterThanOrEqual(1); + expect(result.failed).toBe(0); + + // Verify instance state was updated + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + expect(instance).toBeDefined(); + + // All 3 transitions (submission→review, review→voting, voting→results) were due + // so the instance should have advanced to the final state + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // Verify transitions are marked completed + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + for (const transition of completedTransitions) { + expect(transition.completedAt).not.toBeNull(); + } + }); + + it('should NOT process transitions for DRAFT instances', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { templateId, userEmail } = await createSimpleTemplate( + testData, + task.id, + ); + const caller = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + const pastDate = new Date(now.getTime() - 24 * 60 * 60 * 1000); // 1 day ago + + // Create instance with past dates (but don't publish) + const result = await caller.decision.createInstanceFromTemplate({ + templateId, + name: `Draft Monitor Test ${task.id}`, + phases: [ + { + phaseId: 'submission', + startDate: new Date( + pastDate.getTime() - 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: pastDate.toISOString(), + }, + { + phaseId: 'review', + startDate: pastDate.toISOString(), + endDate: new Date( + pastDate.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date( + pastDate.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + pastDate.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date( + pastDate.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + ], + }); + + testData.trackProfileForCleanup(result.id); + + // Draft instances don't have transitions created at all, + // but let's manually insert one to be sure the monitor skips it + await db.insert(decisionProcessTransitions).values({ + processInstanceId: result.processInstance.id, + fromStateId: 'submission', + toStateId: 'review', + scheduledDate: pastDate.toISOString(), + }); + + const monitorResult = await processDecisionsTransitions(); + + // The transition should NOT have been processed because the instance is DRAFT + const transitions = await db._query.decisionProcessTransitions.findMany({ + where: eq( + decisionProcessTransitions.processInstanceId, + result.processInstance.id, + ), + }); + + for (const transition of transitions) { + expect(transition.completedAt).toBeNull(); + } + }); + + it('should NOT process future-dated transitions', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a published instance with future transitions (don't make them due) + const { instanceId, transitions } = + await createPublishedInstanceWithDueTransitions(testData, task.id, { + makeDue: false, + }); + + const result = await processDecisionsTransitions(); + + // Future transitions should not be processed + const refreshedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + for (const transition of refreshedTransitions) { + expect(transition.completedAt).toBeNull(); + } + + // Instance should still be in the initial state + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('submission'); + }); + + it('should process multiple due transitions sequentially for same instance', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, transitions } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // All 3 transitions should be due (submission→review, review→voting, voting→results) + expect(transitions.length).toBe(3); + + const result = await processDecisionsTransitions(); + + // All 3 should have been processed + expect(result.processed).toBeGreaterThanOrEqual(3); + expect(result.failed).toBe(0); + + // Instance should be at the final state (results) + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // All transitions should be completed + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + expect(completedTransitions.every((t) => t.completedAt !== null)).toBe( + true, + ); + }); + + it('should skip already-completed transitions', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, transitions } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // Manually mark the first transition as completed + const firstTransition = transitions.find( + (t) => t.fromStateId === 'submission', + ); + expect(firstTransition).toBeDefined(); + + const completedTime = new Date().toISOString(); + await db + .update(decisionProcessTransitions) + .set({ completedAt: completedTime }) + .where(eq(decisionProcessTransitions.id, firstTransition!.id)); + + // Also update instance state to match the completed transition + await db + .update(processInstances) + .set({ currentStateId: 'review' }) + .where(eq(processInstances.id, instanceId)); + + const result = await processDecisionsTransitions(); + + // The first transition should not be re-processed + const refreshedFirst = + await db._query.decisionProcessTransitions.findFirst({ + where: eq(decisionProcessTransitions.id, firstTransition!.id), + }); + + // The first transition should still have a completedAt set (it was pre-completed) + expect(refreshedFirst!.completedAt).not.toBeNull(); + + // The remaining 2 transitions should have been processed + // (review→voting, voting→results) + expect(result.processed).toBeGreaterThanOrEqual(2); + }); + + it('should stop processing instance on error and continue others', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a valid published instance with due transitions + const { instanceId: goodInstanceId } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // Create another instance and corrupt its data + const { + templateId, + userEmail, + } = await createSimpleTemplate(testData, `${task.id}-bad`); + const caller2 = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + const badResult = await caller2.decision.createInstanceFromTemplate({ + templateId, + name: `Bad Monitor Test ${task.id}`, + phases: [ + { + phaseId: 'submission', + startDate: now.toISOString(), + endDate: new Date( + now.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'review', + startDate: new Date( + now.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + now.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date( + now.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + now.getTime() + 21 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date( + now.getTime() + 21 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + ], + }); + + testData.trackProfileForCleanup(badResult.id); + const badInstanceId = badResult.processInstance.id; + + // Publish the bad instance + await caller2.decision.updateDecisionInstance({ + instanceId: badInstanceId, + status: ProcessStatus.PUBLISHED, + }); + + // Make its transitions due with staggered past dates + const badTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq( + decisionProcessTransitions.processInstanceId, + badInstanceId, + ), + }); + + badTransitions.sort( + (a, b) => + new Date(a.scheduledDate).getTime() - + new Date(b.scheduledDate).getTime(), + ); + + for (let i = 0; i < badTransitions.length; i++) { + const staggeredPast = new Date( + now.getTime() - (badTransitions.length - i) * 60 * 60 * 1000, + ).toISOString(); + await db + .update(decisionProcessTransitions) + .set({ scheduledDate: staggeredPast }) + .where(eq(decisionProcessTransitions.id, badTransitions[i]!.id)); + } + + // Corrupt the bad instance's data by setting instanceData to have empty phases + await db + .update(processInstances) + .set({ + instanceData: { currentPhaseId: 'submission', phases: [] }, + }) + .where(eq(processInstances.id, badInstanceId)); + + const monitorResult = await processDecisionsTransitions(); + + // The bad instance should have failed, but the good instance should succeed + expect(monitorResult.failed).toBeGreaterThanOrEqual(1); + expect(monitorResult.processed).toBeGreaterThanOrEqual(1); + expect(monitorResult.errors.length).toBeGreaterThanOrEqual(1); + + // Verify the good instance was processed + const goodInstance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, goodInstanceId), + }); + expect(goodInstance!.currentStateId).toBe('results'); + }); +}); From fb1de90a8ebcb6b1b7ae17681dcf68227ad47b85 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Mon, 2 Mar 2026 17:13:14 +0100 Subject: [PATCH 07/10] format --- .../instances/transitionMonitor.test.ts | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts index 83971d31e..2a1622c4a 100644 --- a/services/api/src/routers/decision/instances/transitionMonitor.test.ts +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -108,22 +108,30 @@ async function createPublishedInstanceWithDueTransitions( const phases = [ { phaseId: 'submission', - startDate: new Date(now.getTime() + offsets[0]!.startOffsetMs).toISOString(), + startDate: new Date( + now.getTime() + offsets[0]!.startOffsetMs, + ).toISOString(), endDate: new Date(now.getTime() + offsets[0]!.endOffsetMs).toISOString(), }, { phaseId: 'review', - startDate: new Date(now.getTime() + offsets[1]!.startOffsetMs).toISOString(), + startDate: new Date( + now.getTime() + offsets[1]!.startOffsetMs, + ).toISOString(), endDate: new Date(now.getTime() + offsets[1]!.endOffsetMs).toISOString(), }, { phaseId: 'voting', - startDate: new Date(now.getTime() + offsets[2]!.startOffsetMs).toISOString(), + startDate: new Date( + now.getTime() + offsets[2]!.startOffsetMs, + ).toISOString(), endDate: new Date(now.getTime() + offsets[2]!.endOffsetMs).toISOString(), }, { phaseId: 'results', - startDate: new Date(now.getTime() + offsets[3]!.startOffsetMs).toISOString(), + startDate: new Date( + now.getTime() + offsets[3]!.startOffsetMs, + ).toISOString(), }, ]; @@ -406,10 +414,11 @@ describe('processDecisionsTransitions', () => { const result = await processDecisionsTransitions(); // The first transition should not be re-processed - const refreshedFirst = - await db._query.decisionProcessTransitions.findFirst({ + const refreshedFirst = await db._query.decisionProcessTransitions.findFirst( + { where: eq(decisionProcessTransitions.id, firstTransition!.id), - }); + }, + ); // The first transition should still have a completedAt set (it was pre-completed) expect(refreshedFirst!.completedAt).not.toBeNull(); @@ -430,10 +439,10 @@ describe('processDecisionsTransitions', () => { await createPublishedInstanceWithDueTransitions(testData, task.id); // Create another instance and corrupt its data - const { - templateId, - userEmail, - } = await createSimpleTemplate(testData, `${task.id}-bad`); + const { templateId, userEmail } = await createSimpleTemplate( + testData, + `${task.id}-bad`, + ); const caller2 = await createAuthenticatedCaller(userEmail); const now = new Date(); @@ -485,13 +494,9 @@ describe('processDecisionsTransitions', () => { }); // Make its transitions due with staggered past dates - const badTransitions = - await db._query.decisionProcessTransitions.findMany({ - where: eq( - decisionProcessTransitions.processInstanceId, - badInstanceId, - ), - }); + const badTransitions = await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, badInstanceId), + }); badTransitions.sort( (a, b) => From 3142b1c39a331df5d8f48476cc7463d564176df4 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Wed, 4 Mar 2026 14:40:09 +0100 Subject: [PATCH 08/10] Pull out common functionality --- .../decision/buildExpectedTransitions.ts | 61 ++++++ .../decision/createTransitionsForProcess.ts | 48 +---- .../common/src/services/decision/index.ts | 1 + .../services/decision/transitionMonitor.ts | 85 ++++---- .../decision/updateTransitionsForProcess.ts | 187 +++++++----------- .../buildExpectedTransitions.test.ts | 159 +++++++++++++++ .../instances/transitionMonitor.test.ts | 18 +- 7 files changed, 345 insertions(+), 214 deletions(-) create mode 100644 packages/common/src/services/decision/buildExpectedTransitions.ts create mode 100644 services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts diff --git a/packages/common/src/services/decision/buildExpectedTransitions.ts b/packages/common/src/services/decision/buildExpectedTransitions.ts new file mode 100644 index 000000000..c79502630 --- /dev/null +++ b/packages/common/src/services/decision/buildExpectedTransitions.ts @@ -0,0 +1,61 @@ +import type { ProcessInstance } from '@op/db/schema'; + +import { CommonError } from '../../utils'; +import type { DecisionInstanceData } from './schemas/instanceData'; +import type { ScheduledTransition } from './types'; + +/** + * Builds expected transition records from a process instance's phase data. + * Only creates transitions for phases with date-based advancement + * (rules.advancement.method === 'date'). + * + * Shared by both `createTransitionsForProcess` and `updateTransitionsForProcess`. + */ +export function buildExpectedTransitions( + processInstance: ProcessInstance, +): ScheduledTransition[] { + // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, + // but this function is only called for new DecisionInstanceData processes + const instanceData = processInstance.instanceData as DecisionInstanceData; + const phases = instanceData.phases; + + if (!phases || phases.length === 0) { + throw new CommonError( + 'Process instance must have at least one phase configured', + ); + } + + const transitions: ScheduledTransition[] = []; + + phases.forEach((currentPhase, index) => { + const nextPhase = phases[index + 1]; + // Skip last phase (no next phase to transition to) + if (!nextPhase) { + return; + } + + // Only create transition if current phase uses date-based advancement + if (currentPhase.rules?.advancement?.method !== 'date') { + return; + } + + // Schedule transition when the current phase ends + const scheduledDate = currentPhase.endDate; + + if (!scheduledDate) { + throw new CommonError( + `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, + ); + } + + // DB columns are named fromStateId/toStateId but store phase IDs + transitions.push({ + processInstanceId: processInstance.id, + fromStateId: currentPhase.phaseId, + toStateId: nextPhase.phaseId, + scheduledDate: new Date(scheduledDate).toISOString(), + }); + }); + + return transitions; +} diff --git a/packages/common/src/services/decision/createTransitionsForProcess.ts b/packages/common/src/services/decision/createTransitionsForProcess.ts index cb8da08e6..96cdea679 100644 --- a/packages/common/src/services/decision/createTransitionsForProcess.ts +++ b/packages/common/src/services/decision/createTransitionsForProcess.ts @@ -3,8 +3,7 @@ import { decisionProcessTransitions } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; import { CommonError } from '../../utils'; -import type { DecisionInstanceData } from './schemas/instanceData'; -import type { ScheduledTransition } from './types'; +import { buildExpectedTransitions } from './buildExpectedTransitions'; /** * Creates scheduled transition records for phases with date-based advancement. @@ -30,50 +29,7 @@ export async function createTransitionsForProcess({ const dbClient = tx ?? db; try { - // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, - // but this function is only called for new DecisionInstanceData processes - const instanceData = processInstance.instanceData as DecisionInstanceData; - const phases = instanceData.phases; - - if (!phases || phases.length === 0) { - throw new CommonError( - 'Process instance must have at least one phase configured', - ); - } - - // Create transitions for phases that use date-based advancement - // A transition is created FROM a phase (when it ends) TO the next phase - const transitionsToCreate: ScheduledTransition[] = []; - - phases.forEach((currentPhase, index) => { - const nextPhase = phases[index + 1]; - // Skip last phase (no next phase to transition to) - if (!nextPhase) { - return; - } - - // Only create transition if current phase uses date-based advancement - if (currentPhase.rules?.advancement?.method !== 'date') { - return; - } - - // Schedule transition when the current phase ends - const scheduledDate = currentPhase.endDate; - - if (!scheduledDate) { - throw new CommonError( - `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, - ); - } - - // DB columns are named fromStateId/toStateId but store phase IDs - transitionsToCreate.push({ - processInstanceId: processInstance.id, - fromStateId: currentPhase.phaseId, - toStateId: nextPhase.phaseId, - scheduledDate: new Date(scheduledDate).toISOString(), - }); - }); + const transitionsToCreate = buildExpectedTransitions(processInstance); if (transitionsToCreate.length === 0) { return { transitions: [] }; diff --git a/packages/common/src/services/decision/index.ts b/packages/common/src/services/decision/index.ts index e7e4563f3..aba99397f 100644 --- a/packages/common/src/services/decision/index.ts +++ b/packages/common/src/services/decision/index.ts @@ -18,6 +18,7 @@ export * from './getDecisionBySlug'; // Transition management export { TransitionEngine } from './transitionEngine'; export type { TransitionCheckResult } from './transitionEngine'; +export * from './buildExpectedTransitions'; export * from './createTransitionsForProcess'; export * from './updateTransitionsForProcess'; export * from './transitionMonitor'; diff --git a/packages/common/src/services/decision/transitionMonitor.ts b/packages/common/src/services/decision/transitionMonitor.ts index 842467c0f..8d679ec94 100644 --- a/packages/common/src/services/decision/transitionMonitor.ts +++ b/packages/common/src/services/decision/transitionMonitor.ts @@ -1,7 +1,5 @@ import { and, db, eq, isNull, lte, sql } from '@op/db/client'; import { - type DecisionProcessTransition, - type ProcessInstance, ProcessStatus, decisionProcessTransitions, processInstances, @@ -12,16 +10,10 @@ import { CommonError } from '../../utils'; import { processResults } from './processResults'; import type { DecisionInstanceData } from './schemas/instanceData'; -/** Transition with nested process instance relation */ -type TransitionWithRelations = DecisionProcessTransition & { - processInstance: ProcessInstance; -}; - /** Result of processing a single transition */ interface ProcessedTransition { toStateId: string; isTransitioningToFinalState: boolean; - processInstanceId: string; } export interface ProcessDecisionsTransitionsResult { @@ -47,8 +39,8 @@ export async function processDecisionsTransitions(): Promise { - // Fetch transition with related process instance in a single query - const transitionResult = await db._query.decisionProcessTransitions.findFirst( - { - where: eq(decisionProcessTransitions.id, transitionId), - with: { - processInstance: true, - }, - }, - ); - - if (!transitionResult) { - throw new CommonError( - `Transition not found: ${transitionId}. It may have been deleted or the ID is invalid.`, - ); - } - - // Type assertion for the nested relations (Drizzle's type inference doesn't handle nested `with` well) - const transition = transitionResult as TransitionWithRelations; - - const processInstance = transition.processInstance; - if (!processInstance) { - throw new CommonError( - `Process instance not found: ${transition.processInstanceId}`, - ); - } - +async function processTransition(transition: { + id: string; + processInstanceId: string; + toStateId: string; + instanceData: unknown; +}): Promise { // Determine if transitioning to final state using instanceData.phases // In the new schema format, the last phase is always the final state - const instanceData = processInstance.instanceData as DecisionInstanceData; + const instanceData = transition.instanceData as DecisionInstanceData; const phases = instanceData.phases; if (!phases || phases.length === 0) { throw new CommonError( - `Process instance ${processInstance.id} has no phases defined in instanceData`, + `Process instance ${transition.processInstanceId} has no phases defined in instanceData`, ); } @@ -229,19 +203,31 @@ async function processTransition( const lastPhaseId = phases[phases.length - 1]!.phaseId; const isTransitioningToFinalState = transition.toStateId === lastPhaseId; - // Only mark the transition as completed (state update happens after all transitions) - await db + // Only mark the transition as completed (state update happens after all transitions). + // The WHERE completedAt IS NULL guard + returning() check prevent double-processing + // if a concurrent monitor run already handled this transition. + const updated = await db .update(decisionProcessTransitions) .set({ completedAt: new Date().toISOString(), }) - .where(eq(decisionProcessTransitions.id, transitionId)); + .where( + and( + eq(decisionProcessTransitions.id, transition.id), + isNull(decisionProcessTransitions.completedAt), + ), + ) + .returning({ id: decisionProcessTransitions.id }); + + if (updated.length === 0) { + // Another worker already completed this transition — skip it + return null; + } // Return info needed for state update return { toStateId: transition.toStateId, isTransitioningToFinalState, - processInstanceId: transition.processInstanceId, }; } @@ -257,6 +243,7 @@ async function updateInstanceState( .update(processInstances) .set({ currentStateId: toStateId, + updatedAt: new Date().toISOString(), instanceData: sql`jsonb_set( ${processInstances.instanceData}, '{currentPhaseId}', diff --git a/packages/common/src/services/decision/updateTransitionsForProcess.ts b/packages/common/src/services/decision/updateTransitionsForProcess.ts index 833edfc09..c7b3f58f5 100644 --- a/packages/common/src/services/decision/updateTransitionsForProcess.ts +++ b/packages/common/src/services/decision/updateTransitionsForProcess.ts @@ -1,11 +1,9 @@ -import { type TransactionType, db, eq } from '@op/db/client'; +import { type TransactionType, db, eq, inArray } from '@op/db/client'; import { decisionProcessTransitions } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; -import pMap from 'p-map'; import { CommonError } from '../../utils'; -import type { DecisionInstanceData } from './schemas/instanceData'; -import type { ScheduledTransition } from './types'; +import { buildExpectedTransitions } from './buildExpectedTransitions'; export interface UpdateTransitionsInput { processInstance: ProcessInstance; @@ -34,16 +32,7 @@ export async function updateTransitionsForProcess({ const dbClient = tx ?? db; try { - // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, - // but this function is only called for new DecisionInstanceData processes - const instanceData = processInstance.instanceData as DecisionInstanceData; - const phases = instanceData.phases; - - if (!phases || phases.length === 0) { - throw new CommonError( - 'Process instance must have at least one phase configured', - ); - } + const expectedTransitions = buildExpectedTransitions(processInstance); // Get existing transitions const existingTransitions = @@ -61,40 +50,6 @@ export async function updateTransitionsForProcess({ deleted: 0, }; - // Build expected transitions for phases with date-based advancement - // A transition is created FROM a phase (when it ends) TO the next phase - const expectedTransitions: ScheduledTransition[] = []; - - phases.forEach((currentPhase, index) => { - const nextPhase = phases[index + 1]; - // Skip last phase (no next phase to transition to) - if (!nextPhase) { - return; - } - - // Only create transition if current phase uses date-based advancement - if (currentPhase.rules?.advancement?.method !== 'date') { - return; - } - - // Schedule transition when the current phase ends - const scheduledDate = currentPhase.endDate; - - if (!scheduledDate) { - throw new CommonError( - `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, - ); - } - - // DB columns are named fromStateId/toStateId but store phase IDs - expectedTransitions.push({ - processInstanceId: processInstance.id, - fromStateId: currentPhase.phaseId, - toStateId: nextPhase.phaseId, - scheduledDate: new Date(scheduledDate).toISOString(), - }); - }); - // Calculate transitions to delete upfront (those not in expected set and not completed) // Use composite key (fromStateId:toStateId) to match both fields const expectedTransitionKeys = new Set( @@ -109,74 +64,74 @@ export async function updateTransitionsForProcess({ ) && !transition.completedAt, ); - // Run update/create and delete operations in parallel since they operate on mutually exclusive sets - const [updateResults] = await Promise.all([ - // Update existing transitions or create new ones - pMap( - expectedTransitions, - async (expected) => { - // Find matching existing transition by both fromStateId and toStateId - const existing = existingTransitions.find( - (transition) => - transition.fromStateId === expected.fromStateId && - transition.toStateId === expected.toStateId, - ); - - if (existing) { - // If transition is already completed, don't update it (results phase is locked) - if (existing.completedAt) { - return { action: 'skipped' as const }; - } - - // Update the scheduled date if it changed (compare as timestamps to handle format differences) - if ( - new Date(existing.scheduledDate).getTime() !== - new Date(expected.scheduledDate).getTime() - ) { - await dbClient - .update(decisionProcessTransitions) - .set({ - scheduledDate: expected.scheduledDate, - }) - .where(eq(decisionProcessTransitions.id, existing.id)); - - return { action: 'updated' as const }; - } - - return { action: 'unchanged' as const }; - } else { - // Create new transition for this phase - await dbClient.insert(decisionProcessTransitions).values({ - processInstanceId: expected.processInstanceId, - fromStateId: expected.fromStateId, - toStateId: expected.toStateId, - scheduledDate: expected.scheduledDate, - }); - - return { action: 'created' as const }; - } - }, - { concurrency: 5 }, - ), - // Delete transitions that are no longer in the expected set - pMap( - transitionsToDelete, - async (transition) => { - await dbClient - .delete(decisionProcessTransitions) - .where(eq(decisionProcessTransitions.id, transition.id)); - }, - { concurrency: 5 }, - ), - ]); - - // Aggregate results - result.updated = updateResults.filter( - (update) => update.action === 'updated', - ).length; - result.created = updateResults.filter( - (update) => update.action === 'created', - ).length; + // Partition expected transitions into updates and creates + const toCreate: typeof expectedTransitions = []; + const toUpdate: { id: string; scheduledDate: string }[] = []; + + for (const expected of expectedTransitions) { + const existing = existingTransitions.find( + (transition) => + transition.fromStateId === expected.fromStateId && + transition.toStateId === expected.toStateId, + ); + + if (existing) { + // If transition is already completed, don't update it (results phase is locked) + if (existing.completedAt) { + continue; + } + + // Update the scheduled date if it changed + if (existing.scheduledDate !== expected.scheduledDate) { + toUpdate.push({ + id: existing.id, + scheduledDate: expected.scheduledDate, + }); + } + } else { + toCreate.push(expected); + } + } + + // Execute all DB operations concurrently + const ops: Promise[] = []; + + // Batch delete + if (transitionsToDelete.length > 0) { + const idsToDelete = transitionsToDelete.map((t) => t.id); + ops.push( + dbClient + .delete(decisionProcessTransitions) + .where(inArray(decisionProcessTransitions.id, idsToDelete)) + .then(() => {}), + ); + } + + // Batch insert + if (toCreate.length > 0) { + ops.push( + dbClient + .insert(decisionProcessTransitions) + .values(toCreate) + .then(() => {}), + ); + } + + // Updates must remain individual (different values per row) + for (const update of toUpdate) { + ops.push( + dbClient + .update(decisionProcessTransitions) + .set({ scheduledDate: update.scheduledDate }) + .where(eq(decisionProcessTransitions.id, update.id)) + .then(() => {}), + ); + } + + await Promise.all(ops); + + result.updated = toUpdate.length; + result.created = toCreate.length; result.deleted = transitionsToDelete.length; return result; diff --git a/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts b/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts new file mode 100644 index 000000000..2fbadd497 --- /dev/null +++ b/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts @@ -0,0 +1,159 @@ +import { buildExpectedTransitions } from '@op/common'; +import type { ProcessInstance } from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +/** + * Creates a minimal ProcessInstance stub for testing buildExpectedTransitions. + * Only `id` and `instanceData` are used by the function. + */ +function stubInstance( + instanceData: unknown, + id = 'test-instance-id', +): ProcessInstance { + return { id, instanceData } as ProcessInstance; +} + +describe('buildExpectedTransitions', () => { + it('should create transitions for date-based phases', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'date' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + { + phaseId: 'phase-b', + rules: { advancement: { method: 'date' } }, + endDate: '2026-07-01T00:00:00.000Z', + }, + { + phaseId: 'phase-c', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + + expect(transitions).toHaveLength(2); + expect(transitions[0]).toEqual({ + processInstanceId: 'test-instance-id', + fromStateId: 'phase-a', + toStateId: 'phase-b', + scheduledDate: '2026-06-01T00:00:00.000Z', + }); + expect(transitions[1]).toEqual({ + processInstanceId: 'test-instance-id', + fromStateId: 'phase-b', + toStateId: 'phase-c', + scheduledDate: '2026-07-01T00:00:00.000Z', + }); + }); + + it('should skip phases without date-based advancement', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'manual' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + { + phaseId: 'phase-b', + rules: { advancement: { method: 'date' } }, + endDate: '2026-07-01T00:00:00.000Z', + }, + { + phaseId: 'phase-c', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + + // Only phase-b has date advancement, creating one transition (b→c) + expect(transitions).toHaveLength(1); + expect(transitions[0]!.fromStateId).toBe('phase-b'); + expect(transitions[0]!.toStateId).toBe('phase-c'); + }); + + it('should return empty array when no phases use date advancement', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'manual' } }, + }, + { + phaseId: 'phase-b', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + expect(transitions).toHaveLength(0); + }); + + it('should return empty array for single phase (no next phase to transition to)', () => { + const instance = stubInstance({ + currentPhaseId: 'only-phase', + phases: [ + { + phaseId: 'only-phase', + rules: { advancement: { method: 'date' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + expect(transitions).toHaveLength(0); + }); + + it('should throw when phases array is empty', () => { + const instance = stubInstance({ + currentPhaseId: 'x', + phases: [], + }); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Process instance must have at least one phase configured', + ); + }); + + it('should throw when phases is undefined', () => { + const instance = stubInstance({ + currentPhaseId: 'x', + }); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Process instance must have at least one phase configured', + ); + }); + + it('should throw when date-based phase has no endDate', () => { + const instance = stubInstance( + { + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'date' } }, + // no endDate + }, + { + phaseId: 'phase-b', + }, + ], + }, + 'inst-123', + ); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Phase "phase-a" must have an end date for date-based advancement (instance: inst-123)', + ); + }); +}); diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts index 2a1622c4a..024307cb0 100644 --- a/services/api/src/routers/decision/instances/transitionMonitor.test.ts +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -206,6 +206,12 @@ describe('processDecisionsTransitions', () => { task.id, ); + // Capture updatedAt before processing + const beforeInstance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + const updatedAtBefore = beforeInstance!.updatedAt; + const result = await processDecisionsTransitions(); expect(result.processed).toBeGreaterThanOrEqual(1); @@ -224,6 +230,12 @@ describe('processDecisionsTransitions', () => { expect(instanceData.currentPhaseId).toBe('results'); expect(instance!.currentStateId).toBe('results'); + // Verify updatedAt was set by the monitor + expect(instance!.updatedAt).toBeDefined(); + expect(new Date(instance!.updatedAt!).getTime()).toBeGreaterThan( + new Date(updatedAtBefore!).getTime(), + ); + // Verify transitions are marked completed const completedTransitions = await db._query.decisionProcessTransitions.findMany({ @@ -298,7 +310,7 @@ describe('processDecisionsTransitions', () => { scheduledDate: pastDate.toISOString(), }); - const monitorResult = await processDecisionsTransitions(); + await processDecisionsTransitions(); // The transition should NOT have been processed because the instance is DRAFT const transitions = await db._query.decisionProcessTransitions.findMany({ @@ -320,12 +332,12 @@ describe('processDecisionsTransitions', () => { const testData = new TestDecisionsDataManager(task.id, onTestFinished); // Create a published instance with future transitions (don't make them due) - const { instanceId, transitions } = + const { instanceId } = await createPublishedInstanceWithDueTransitions(testData, task.id, { makeDue: false, }); - const result = await processDecisionsTransitions(); + await processDecisionsTransitions(); // Future transitions should not be processed const refreshedTransitions = From cee5ada630b2f0791463843cb9f95541afbc8f33 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Wed, 4 Mar 2026 23:20:13 +0100 Subject: [PATCH 09/10] format --- .../routers/decision/instances/transitionMonitor.test.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts index 024307cb0..15757720f 100644 --- a/services/api/src/routers/decision/instances/transitionMonitor.test.ts +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -332,10 +332,13 @@ describe('processDecisionsTransitions', () => { const testData = new TestDecisionsDataManager(task.id, onTestFinished); // Create a published instance with future transitions (don't make them due) - const { instanceId } = - await createPublishedInstanceWithDueTransitions(testData, task.id, { + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + { makeDue: false, - }); + }, + ); await processDecisionsTransitions(); From af5b51cddff629c82576cbb5733827f8e2723ee9 Mon Sep 17 00:00:00 2001 From: Scott Cazan Date: Thu, 5 Mar 2026 11:22:56 +0100 Subject: [PATCH 10/10] Test concurrent runs --- .../instances/transitionMonitor.test.ts | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts index 15757720f..c68172139 100644 --- a/services/api/src/routers/decision/instances/transitionMonitor.test.ts +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -550,4 +550,49 @@ describe('processDecisionsTransitions', () => { }); expect(goodInstance!.currentStateId).toBe('results'); }); + + it('should handle concurrent workers without double-processing', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + ); + + // Run two monitor invocations concurrently (simulates two Inngest workers) + const [result1, result2] = await Promise.all([ + processDecisionsTransitions(), + processDecisionsTransitions(), + ]); + + // Between the two runs, exactly 3 transitions should have been processed total + const totalProcessed = result1.processed + result2.processed; + expect(totalProcessed).toBeGreaterThanOrEqual(3); + + // Neither run should have failures + expect(result1.failed).toBe(0); + expect(result2.failed).toBe(0); + + // Instance should be at the final state + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // Each transition should have completedAt set exactly once + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + expect(completedTransitions).toHaveLength(3); + for (const transition of completedTransitions) { + expect(transition.completedAt).not.toBeNull(); + } + }); });