diff --git a/packages/common/src/services/decision/buildExpectedTransitions.ts b/packages/common/src/services/decision/buildExpectedTransitions.ts new file mode 100644 index 000000000..c79502630 --- /dev/null +++ b/packages/common/src/services/decision/buildExpectedTransitions.ts @@ -0,0 +1,61 @@ +import type { ProcessInstance } from '@op/db/schema'; + +import { CommonError } from '../../utils'; +import type { DecisionInstanceData } from './schemas/instanceData'; +import type { ScheduledTransition } from './types'; + +/** + * Builds expected transition records from a process instance's phase data. + * Only creates transitions for phases with date-based advancement + * (rules.advancement.method === 'date'). + * + * Shared by both `createTransitionsForProcess` and `updateTransitionsForProcess`. + */ +export function buildExpectedTransitions( + processInstance: ProcessInstance, +): ScheduledTransition[] { + // Type assertion: instanceData is `unknown` in DB to support legacy formats for viewing, + // but this function is only called for new DecisionInstanceData processes + const instanceData = processInstance.instanceData as DecisionInstanceData; + const phases = instanceData.phases; + + if (!phases || phases.length === 0) { + throw new CommonError( + 'Process instance must have at least one phase configured', + ); + } + + const transitions: ScheduledTransition[] = []; + + phases.forEach((currentPhase, index) => { + const nextPhase = phases[index + 1]; + // Skip last phase (no next phase to transition to) + if (!nextPhase) { + return; + } + + // Only create transition if current phase uses date-based advancement + if (currentPhase.rules?.advancement?.method !== 'date') { + return; + } + + // Schedule transition when the current phase ends + const scheduledDate = currentPhase.endDate; + + if (!scheduledDate) { + throw new CommonError( + `Phase "${currentPhase.phaseId}" must have an end date for date-based advancement (instance: ${processInstance.id})`, + ); + } + + // DB columns are named fromStateId/toStateId but store phase IDs + transitions.push({ + processInstanceId: processInstance.id, + fromStateId: currentPhase.phaseId, + toStateId: nextPhase.phaseId, + scheduledDate: new Date(scheduledDate).toISOString(), + }); + }); + + return transitions; +} diff --git a/packages/common/src/services/decision/createInstanceFromTemplate.ts b/packages/common/src/services/decision/createInstanceFromTemplate.ts index 09e727926..7c067bb9d 100644 --- a/packages/common/src/services/decision/createInstanceFromTemplate.ts +++ b/packages/common/src/services/decision/createInstanceFromTemplate.ts @@ -1,4 +1,4 @@ -import { db, eq } from '@op/db/client'; +import { db } from '@op/db/client'; import { EntityType, ProcessStatus, @@ -12,7 +12,6 @@ import type { User } from '@op/supabase/lib'; import { CommonError, UnauthorizedError } from '../../utils'; import { assertUserByAuthId } from '../assert'; import { generateUniqueProfileSlug } from '../profile/utils'; -import { createTransitionsForProcess } from './createTransitionsForProcess'; import { createDecisionRole } from './decisionRoles'; import { getTemplate } from './getTemplate'; import { @@ -186,27 +185,13 @@ export const createInstanceFromTemplateCore = async ({ return newInstance; }); - // Create scheduled transitions for phases that have date-based advancement AND actual dates set - const hasScheduledDatePhases = instanceData.phases.some( - (phase) => phase.rules?.advancement?.method === 'date' && phase.startDate, - ); - - if (hasScheduledDatePhases) { - try { - await createTransitionsForProcess({ processInstance: instance }); - } catch (error) { - // Log but don't fail instance creation if transitions can't be created - console.error( - 'Failed to create transitions for process instance:', - error, - ); - } - } + // Note: Transitions are NOT created here because the instance is created as DRAFT. + // Transitions are created when the instance is published via updateDecisionInstance. // Fetch the profile with processInstance joined for the response // profileId is guaranteed to be set since we just created it above - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, instance.profileId!), + const profile = await db.query.profiles.findFirst({ + where: { id: instance.profileId! }, with: { processInstance: true, }, diff --git a/packages/common/src/services/decision/createTransitionsForProcess.ts b/packages/common/src/services/decision/createTransitionsForProcess.ts index 4f2de85ec..96cdea679 100644 --- a/packages/common/src/services/decision/createTransitionsForProcess.ts +++ b/packages/common/src/services/decision/createTransitionsForProcess.ts @@ -1,74 +1,51 @@ -import { db } from '@op/db/client'; +import { type TransactionType, db } from '@op/db/client'; import { decisionProcessTransitions } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; import { CommonError } from '../../utils'; -import type { InstanceData, PhaseConfiguration } from './types'; +import { buildExpectedTransitions } from './buildExpectedTransitions'; -export interface CreateTransitionsInput { +/** + * Creates scheduled transition records for phases with date-based advancement. + * Each transition fires when the current phase's end date arrives. + * + * Rules are read from the instance's phase data (instanceData.phases[].rules), + * which are populated from the template when the instance is created. + */ +export async function createTransitionsForProcess({ + processInstance, + tx, +}: { processInstance: ProcessInstance; -} - -export interface CreateTransitionsResult { + tx?: TransactionType; +}): Promise<{ transitions: Array<{ id: string; fromStateId: string | null; toStateId: string; scheduledDate: Date; }>; -} +}> { + const dbClient = tx ?? db; -/** - * Creates transition records for all phases in a process instance. - * Each transition represents the end of one phase and the start of the next. - */ -export async function createTransitionsForProcess({ - processInstance, -}: CreateTransitionsInput): Promise { try { - const instanceData = processInstance.instanceData as InstanceData; - const phases = instanceData.phases; + const transitionsToCreate = buildExpectedTransitions(processInstance); - if (!phases || phases.length === 0) { - throw new CommonError( - 'Process instance must have at least one phase configured', - ); + if (transitionsToCreate.length === 0) { + return { transitions: [] }; } - const transitionsToCreate = phases.flatMap( - (phase: PhaseConfiguration, index: number) => { - const scheduledDate = phase.endDate ?? phase.startDate; - - // Skip phases that have no dates yet — they don't need transitions - if (!scheduledDate) { - return []; - } - - const fromStateId = index > 0 ? phases[index - 1]?.phaseId : null; - const toStateId = phase.phaseId; - - return [ - { - processInstanceId: processInstance.id, - fromStateId, - toStateId, - scheduledDate: new Date(scheduledDate).toISOString(), - }, - ]; - }, - ); - - const createdTransitions = await db + const createdTransitions = await dbClient .insert(decisionProcessTransitions) .values(transitionsToCreate) .returning(); return { - transitions: createdTransitions.map((t) => ({ - id: t.id, - fromStateId: t.fromStateId, - toStateId: t.toStateId, - scheduledDate: new Date(t.scheduledDate), + transitions: createdTransitions.map((transition) => ({ + id: transition.id, + fromStateId: transition.fromStateId, + toStateId: transition.toStateId, + scheduledDate: new Date(transition.scheduledDate), })), }; } catch (error) { diff --git a/packages/common/src/services/decision/index.ts b/packages/common/src/services/decision/index.ts index 56a1c0c1c..aba99397f 100644 --- a/packages/common/src/services/decision/index.ts +++ b/packages/common/src/services/decision/index.ts @@ -18,6 +18,7 @@ export * from './getDecisionBySlug'; // Transition management export { TransitionEngine } from './transitionEngine'; export type { TransitionCheckResult } from './transitionEngine'; +export * from './buildExpectedTransitions'; export * from './createTransitionsForProcess'; export * from './updateTransitionsForProcess'; export * from './transitionMonitor'; @@ -78,3 +79,9 @@ export type { DecisionInstanceData, PhaseInstanceData, } from './schemas/instanceData'; +export type { + DecisionSchemaDefinition, + PhaseDefinition, + PhaseRules, + ProcessConfig, +} from './schemas/types'; diff --git a/packages/common/src/services/decision/transitionEngine.ts b/packages/common/src/services/decision/transitionEngine.ts index 105c1e0b6..04c91dee6 100644 --- a/packages/common/src/services/decision/transitionEngine.ts +++ b/packages/common/src/services/decision/transitionEngine.ts @@ -73,12 +73,6 @@ export class TransitionEngine { const process = instance.process as any; const processSchema = process.processSchema as ProcessSchema; const instanceData = instance.instanceData as InstanceData; - console.log( - 'TRANSITION', - instanceData.currentPhaseId, - instance.currentStateId, - instanceData, - ); const currentStateId = instanceData.currentPhaseId || instance.currentStateId || ''; diff --git a/packages/common/src/services/decision/transitionMonitor.ts b/packages/common/src/services/decision/transitionMonitor.ts index fc436f6af..8d679ec94 100644 --- a/packages/common/src/services/decision/transitionMonitor.ts +++ b/packages/common/src/services/decision/transitionMonitor.ts @@ -1,14 +1,20 @@ -import { db, eq, lte, sql } from '@op/db/client'; +import { and, db, eq, isNull, lte, sql } from '@op/db/client'; import { + ProcessStatus, decisionProcessTransitions, - decisionProcesses, processInstances, } from '@op/db/schema'; import pMap from 'p-map'; import { CommonError } from '../../utils'; -// import { processResults } from './processResults'; -import type { ProcessSchema, StateDefinition } from './types'; +import { processResults } from './processResults'; +import type { DecisionInstanceData } from './schemas/instanceData'; + +/** Result of processing a single transition */ +interface ProcessedTransition { + toStateId: string; + isTransitioningToFinalState: boolean; +} export interface ProcessDecisionsTransitionsResult { processed: number; @@ -33,14 +39,31 @@ export async function processDecisionsTransitions(): Promise + // Query due transitions with process instance data in a single query + // (avoids N+1 re-fetches per transition in processTransition) + const dueTransitions = await db + .select({ + id: decisionProcessTransitions.id, + processInstanceId: decisionProcessTransitions.processInstanceId, + fromStateId: decisionProcessTransitions.fromStateId, + toStateId: decisionProcessTransitions.toStateId, + scheduledDate: decisionProcessTransitions.scheduledDate, + completedAt: decisionProcessTransitions.completedAt, + instanceData: processInstances.instanceData, + }) + .from(decisionProcessTransitions) + .innerJoin( + processInstances, + eq(decisionProcessTransitions.processInstanceId, processInstances.id), + ) + .where( and( - isNull(transitions.completedAt), - lte(transitions.scheduledDate, now), + isNull(decisionProcessTransitions.completedAt), + lte(decisionProcessTransitions.scheduledDate, now), + eq(processInstances.status, ProcessStatus.PUBLISHED), ), - orderBy: (transitions, { asc }) => [asc(transitions.scheduledDate)], - }); + ) + .orderBy(decisionProcessTransitions.scheduledDate); // Group transitions by processInstanceId to avoid race conditions // within the same process instance @@ -57,17 +80,23 @@ export async function processDecisionsTransitions(): Promise { + async ([processInstanceId, transitions]) => { + let lastSuccessfulTransition: ProcessedTransition | null = null; + // Process this process's transitions sequentially for (const transition of transitions) { try { - await processTransition(transition.id); - - result.processed++; + // Process transition (marks completedAt, returns state info). + // Returns null if another worker already completed this transition. + const processed = await processTransition(transition); + if (processed) { + lastSuccessfulTransition = processed; + result.processed++; + } } catch (error) { result.failed++; @@ -81,10 +110,56 @@ export async function processDecisionsTransitions(): Promise { - const transition = await db._query.decisionProcessTransitions.findFirst({ - where: eq(decisionProcessTransitions.id, transitionId), - }); - - if (!transition) { - throw new CommonError( - `Transition not found: ${transitionId}. It may have been deleted or the ID is invalid.`, - ); - } - - if (transition.completedAt) { - return; - } - - // Get the process instance to check if we're transitioning to a final state - const processInstance = await db._query.processInstances.findFirst({ - where: eq(processInstances.id, transition.processInstanceId), - }); - - if (!processInstance) { +async function processTransition(transition: { + id: string; + processInstanceId: string; + toStateId: string; + instanceData: unknown; +}): Promise { + // Determine if transitioning to final state using instanceData.phases + // In the new schema format, the last phase is always the final state + const instanceData = transition.instanceData as DecisionInstanceData; + const phases = instanceData.phases; + + if (!phases || phases.length === 0) { throw new CommonError( - `Process instance not found: ${transition.processInstanceId}`, + `Process instance ${transition.processInstanceId} has no phases defined in instanceData`, ); } - // Get the process schema to check the state type - const process = await db._query.decisionProcesses.findFirst({ - where: eq(decisionProcesses.id, processInstance.processId), - }); - - if (!process) { - throw new CommonError(`Process not found: ${processInstance.processId}`); + // Safe to assert non-null after the length check above + const lastPhaseId = phases[phases.length - 1]!.phaseId; + const isTransitioningToFinalState = transition.toStateId === lastPhaseId; + + // Only mark the transition as completed (state update happens after all transitions). + // The WHERE completedAt IS NULL guard + returning() check prevent double-processing + // if a concurrent monitor run already handled this transition. + const updated = await db + .update(decisionProcessTransitions) + .set({ + completedAt: new Date().toISOString(), + }) + .where( + and( + eq(decisionProcessTransitions.id, transition.id), + isNull(decisionProcessTransitions.completedAt), + ), + ) + .returning({ id: decisionProcessTransitions.id }); + + if (updated.length === 0) { + // Another worker already completed this transition — skip it + return null; } - const processSchema = process.processSchema as ProcessSchema; - const toState = processSchema.states.find( - (state: StateDefinition) => state.id === transition.toStateId, - ); - - const isTransitioningToFinalState = toState?.type === 'final'; - - // Update both the process instance and transition in a single transaction - // to ensure atomicity and prevent partial state updates - // Note: We rely on foreign key constraints to ensure the process instance exists - await db.transaction(async (tx) => { - // Update the process instance to the new state - // Use jsonb_set to update only the currentStateId field without reading the entire instanceData - await tx - .update(processInstances) - .set({ - currentStateId: transition.toStateId, - instanceData: sql`jsonb_set( - ${processInstances.instanceData}, - '{currentStateId}', - to_jsonb(${transition.toStateId}::text) - )`, - }) - .where(eq(processInstances.id, transition.processInstanceId)); + // Return info needed for state update + return { + toStateId: transition.toStateId, + isTransitioningToFinalState, + }; +} - // Mark the transition as completed - await tx - .update(decisionProcessTransitions) - .set({ - completedAt: new Date().toISOString(), - }) - .where(eq(decisionProcessTransitions.id, transition.id)); - }); - - // If transitioning to a final state (results phase), process the results - // This is done AFTER the transition is marked complete to ensure the state change - // is committed even if results processing fails - if (isTransitioningToFinalState) { - try { - console.log( - `Processing results for process instance ${transition.processInstanceId}`, - ); - console.log('RESULT PROCESSING DISABLED'); - // Disabling for now as none are defined in production yet. We need to migrate current processes first. - /* - const result = await processResults({ - processInstanceId: transition.processInstanceId, - }); - - if (!result.success) { - console.error( - `Results processing failed for process instance ${transition.processInstanceId}:`, - result.error, - ); - } else { - console.log( - `Results processed successfully for process instance ${transition.processInstanceId}. Selected ${result.selectedProposalIds.length} proposals.`, - ); - } - */ - } catch (error) { - // Log the error but don't fail the transition - // The transition to the results phase has already been completed - console.error( - `Error processing results for process instance ${transition.processInstanceId}:`, - error, - ); - } - } +/** + * Updates the process instance state to the specified state. + * Called after all transitions for an instance have been processed. + */ +async function updateInstanceState( + processInstanceId: string, + toStateId: string, +): Promise { + await db + .update(processInstances) + .set({ + currentStateId: toStateId, + updatedAt: new Date().toISOString(), + instanceData: sql`jsonb_set( + ${processInstances.instanceData}, + '{currentPhaseId}', + to_jsonb(${toStateId}::text) + )`, + }) + .where(eq(processInstances.id, processInstanceId)); } diff --git a/packages/common/src/services/decision/types.ts b/packages/common/src/services/decision/types.ts index 2eb7c8594..ceb1b428d 100644 --- a/packages/common/src/services/decision/types.ts +++ b/packages/common/src/services/decision/types.ts @@ -1,3 +1,4 @@ +import type { DecisionProcessTransition } from '@op/db/schema'; import type { JSONSchema7 } from 'json-schema'; import type { SelectionPipeline } from './selectionPipeline/types'; @@ -195,3 +196,9 @@ export interface DecisionData { // Decision content should match the decisionDefinition schema [key: string]: unknown; } + +/** A scheduled phase transition (fromStateId/toStateId store phase IDs) */ +export type ScheduledTransition = Pick< + DecisionProcessTransition, + 'processInstanceId' | 'fromStateId' | 'toStateId' | 'scheduledDate' +>; diff --git a/packages/common/src/services/decision/updateDecisionInstance.ts b/packages/common/src/services/decision/updateDecisionInstance.ts index e0aba7f10..bf66f7e3a 100644 --- a/packages/common/src/services/decision/updateDecisionInstance.ts +++ b/packages/common/src/services/decision/updateDecisionInstance.ts @@ -10,6 +10,7 @@ import { assertAccess, permission } from 'access-zones'; import { CommonError, NotFoundError } from '../../utils'; import { getProfileAccessUser } from '../access'; +import { createTransitionsForProcess } from './createTransitionsForProcess'; import { schemaValidator } from './schemaValidator'; import type { DecisionInstanceData, @@ -49,8 +50,8 @@ export const updateDecisionInstance = async ({ user: User; }) => { // Fetch existing instance - const existingInstance = await db._query.processInstances.findFirst({ - where: eq(processInstances.id, instanceId), + const existingInstance = await db.query.processInstances.findFirst({ + where: { id: instanceId }, }); if (!existingInstance) { @@ -178,8 +179,8 @@ export const updateDecisionInstance = async ({ // Only update if there's something to update if (Object.keys(updateData).length === 0) { // Nothing to update, just return the existing profile - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, profileId), + const profile = await db.query.profiles.findFirst({ + where: { id: profileId }, with: { processInstance: true, }, @@ -212,14 +213,23 @@ export const updateDecisionInstance = async ({ // Determine the final status (updated or existing) const finalStatus = status ?? existingInstance.status; + const isBeingPublished = + status === ProcessStatus.PUBLISHED && + existingInstance.status === ProcessStatus.DRAFT; // If status is DRAFT, remove all transitions if (finalStatus === ProcessStatus.DRAFT) { await tx .delete(decisionProcessTransitions) .where(eq(decisionProcessTransitions.processInstanceId, instanceId)); + } else if (isBeingPublished) { + // When publishing a draft, create transitions for all date-based phases + await createTransitionsForProcess({ + processInstance: updatedInstance, + tx, + }); } else if (phases && phases.length > 0) { - // If phases were updated and not DRAFT, update the corresponding transitions + // If phases were updated and already published, update the corresponding transitions await updateTransitionsForProcess({ processInstance: updatedInstance, tx, @@ -228,8 +238,8 @@ export const updateDecisionInstance = async ({ }); // Fetch the profile with processInstance joined for the response - const profile = await db._query.profiles.findFirst({ - where: eq(profiles.id, profileId), + const profile = await db.query.profiles.findFirst({ + where: { id: profileId }, with: { processInstance: true, }, diff --git a/packages/common/src/services/decision/updateTransitionsForProcess.ts b/packages/common/src/services/decision/updateTransitionsForProcess.ts index 7e2cf33a7..c7b3f58f5 100644 --- a/packages/common/src/services/decision/updateTransitionsForProcess.ts +++ b/packages/common/src/services/decision/updateTransitionsForProcess.ts @@ -1,10 +1,9 @@ -import { type TransactionType, db, eq } from '@op/db/client'; +import { type TransactionType, db, eq, inArray } from '@op/db/client'; import { decisionProcessTransitions } from '@op/db/schema'; import type { ProcessInstance } from '@op/db/schema'; -import pMap from 'p-map'; import { CommonError } from '../../utils'; -import type { InstanceData, PhaseConfiguration } from './types'; +import { buildExpectedTransitions } from './buildExpectedTransitions'; export interface UpdateTransitionsInput { processInstance: ProcessInstance; @@ -19,10 +18,11 @@ export interface UpdateTransitionsResult { /** * Updates transition records for a process instance when phase dates change. + * Only handles phases with date-based advancement (rules.advancement.method === 'date'). * This function: * - Updates existing transitions with new scheduled dates - * - Creates new transitions for newly added phases - * - Deletes transitions for removed phases + * - Creates new transitions for newly added date-based phases + * - Deletes transitions for phases that no longer use date-based advancement * - Prevents updates to completed transitions (results phase is locked) */ export async function updateTransitionsForProcess({ @@ -32,14 +32,7 @@ export async function updateTransitionsForProcess({ const dbClient = tx ?? db; try { - const instanceData = processInstance.instanceData as InstanceData; - const phases = instanceData.phases; - - if (!phases || phases.length === 0) { - throw new CommonError( - 'Process instance must have at least one phase configured', - ); - } + const expectedTransitions = buildExpectedTransitions(processInstance); // Get existing transitions const existingTransitions = @@ -57,95 +50,88 @@ export async function updateTransitionsForProcess({ deleted: 0, }; - // Build a map of expected transitions from the current phases - const expectedTransitions = phases.flatMap( - (phase: PhaseConfiguration, index: number) => { - const scheduledDate = phase.endDate ?? phase.startDate; + // Calculate transitions to delete upfront (those not in expected set and not completed) + // Use composite key (fromStateId:toStateId) to match both fields + const expectedTransitionKeys = new Set( + expectedTransitions.map( + (transition) => `${transition.fromStateId}:${transition.toStateId}`, + ), + ); + const transitionsToDelete = existingTransitions.filter( + (transition) => + !expectedTransitionKeys.has( + `${transition.fromStateId}:${transition.toStateId}`, + ) && !transition.completedAt, + ); - // Skip phases that have no dates yet — they don't need transitions - if (!scheduledDate) { - return []; - } + // Partition expected transitions into updates and creates + const toCreate: typeof expectedTransitions = []; + const toUpdate: { id: string; scheduledDate: string }[] = []; - const fromStateId = index > 0 ? phases[index - 1]?.phaseId : null; - const toStateId = phase.phaseId; - - return [ - { - fromStateId, - toStateId, - scheduledDate: new Date(scheduledDate).toISOString(), - }, - ]; - }, - ); + for (const expected of expectedTransitions) { + const existing = existingTransitions.find( + (transition) => + transition.fromStateId === expected.fromStateId && + transition.toStateId === expected.toStateId, + ); + + if (existing) { + // If transition is already completed, don't update it (results phase is locked) + if (existing.completedAt) { + continue; + } - // Process each expected transition in parallel - const updateResults = await pMap( - expectedTransitions, - async (expected) => { - // Find matching existing transition by toStateId - const existing = existingTransitions.find( - (t) => t.toStateId === expected.toStateId, - ); - - if (existing) { - // If transition is already completed, don't update it (results phase is locked) - if (existing.completedAt) { - return { action: 'skipped' as const }; - } - - // Update the scheduled date if it changed - if (existing.scheduledDate !== expected.scheduledDate) { - await dbClient - .update(decisionProcessTransitions) - .set({ - scheduledDate: expected.scheduledDate, - }) - .where(eq(decisionProcessTransitions.id, existing.id)); - - return { action: 'updated' as const }; - } - - return { action: 'unchanged' as const }; - } else { - // Create new transition for this phase - await dbClient.insert(decisionProcessTransitions).values({ - processInstanceId: processInstance.id, - fromStateId: expected.fromStateId, - toStateId: expected.toStateId, + // Update the scheduled date if it changed + if (existing.scheduledDate !== expected.scheduledDate) { + toUpdate.push({ + id: existing.id, scheduledDate: expected.scheduledDate, }); - - return { action: 'created' as const }; } - }, - { concurrency: 5 }, - ); - - // Aggregate results - result.updated = updateResults.filter((r) => r.action === 'updated').length; - result.created = updateResults.filter((r) => r.action === 'created').length; + } else { + toCreate.push(expected); + } + } - // Delete transitions that are no longer in the phases list - // But only delete uncompleted transitions - const expectedStateIds = new Set( - expectedTransitions.map((t) => t.toStateId), - ); - const transitionsToDelete = existingTransitions.filter( - (t) => !expectedStateIds.has(t.toStateId) && !t.completedAt, - ); + // Execute all DB operations concurrently + const ops: Promise[] = []; - await pMap( - transitionsToDelete, - async (transition) => { - await dbClient + // Batch delete + if (transitionsToDelete.length > 0) { + const idsToDelete = transitionsToDelete.map((t) => t.id); + ops.push( + dbClient .delete(decisionProcessTransitions) - .where(eq(decisionProcessTransitions.id, transition.id)); - }, - { concurrency: 5 }, - ); + .where(inArray(decisionProcessTransitions.id, idsToDelete)) + .then(() => {}), + ); + } + + // Batch insert + if (toCreate.length > 0) { + ops.push( + dbClient + .insert(decisionProcessTransitions) + .values(toCreate) + .then(() => {}), + ); + } + + // Updates must remain individual (different values per row) + for (const update of toUpdate) { + ops.push( + dbClient + .update(decisionProcessTransitions) + .set({ scheduledDate: update.scheduledDate }) + .where(eq(decisionProcessTransitions.id, update.id)) + .then(() => {}), + ); + } + + await Promise.all(ops); + result.updated = toUpdate.length; + result.created = toCreate.length; result.deleted = transitionsToDelete.length; return result; diff --git a/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts b/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts new file mode 100644 index 000000000..2fbadd497 --- /dev/null +++ b/services/api/src/routers/decision/instances/buildExpectedTransitions.test.ts @@ -0,0 +1,159 @@ +import { buildExpectedTransitions } from '@op/common'; +import type { ProcessInstance } from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +/** + * Creates a minimal ProcessInstance stub for testing buildExpectedTransitions. + * Only `id` and `instanceData` are used by the function. + */ +function stubInstance( + instanceData: unknown, + id = 'test-instance-id', +): ProcessInstance { + return { id, instanceData } as ProcessInstance; +} + +describe('buildExpectedTransitions', () => { + it('should create transitions for date-based phases', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'date' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + { + phaseId: 'phase-b', + rules: { advancement: { method: 'date' } }, + endDate: '2026-07-01T00:00:00.000Z', + }, + { + phaseId: 'phase-c', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + + expect(transitions).toHaveLength(2); + expect(transitions[0]).toEqual({ + processInstanceId: 'test-instance-id', + fromStateId: 'phase-a', + toStateId: 'phase-b', + scheduledDate: '2026-06-01T00:00:00.000Z', + }); + expect(transitions[1]).toEqual({ + processInstanceId: 'test-instance-id', + fromStateId: 'phase-b', + toStateId: 'phase-c', + scheduledDate: '2026-07-01T00:00:00.000Z', + }); + }); + + it('should skip phases without date-based advancement', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'manual' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + { + phaseId: 'phase-b', + rules: { advancement: { method: 'date' } }, + endDate: '2026-07-01T00:00:00.000Z', + }, + { + phaseId: 'phase-c', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + + // Only phase-b has date advancement, creating one transition (b→c) + expect(transitions).toHaveLength(1); + expect(transitions[0]!.fromStateId).toBe('phase-b'); + expect(transitions[0]!.toStateId).toBe('phase-c'); + }); + + it('should return empty array when no phases use date advancement', () => { + const instance = stubInstance({ + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'manual' } }, + }, + { + phaseId: 'phase-b', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + expect(transitions).toHaveLength(0); + }); + + it('should return empty array for single phase (no next phase to transition to)', () => { + const instance = stubInstance({ + currentPhaseId: 'only-phase', + phases: [ + { + phaseId: 'only-phase', + rules: { advancement: { method: 'date' } }, + endDate: '2026-06-01T00:00:00.000Z', + }, + ], + }); + + const transitions = buildExpectedTransitions(instance); + expect(transitions).toHaveLength(0); + }); + + it('should throw when phases array is empty', () => { + const instance = stubInstance({ + currentPhaseId: 'x', + phases: [], + }); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Process instance must have at least one phase configured', + ); + }); + + it('should throw when phases is undefined', () => { + const instance = stubInstance({ + currentPhaseId: 'x', + }); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Process instance must have at least one phase configured', + ); + }); + + it('should throw when date-based phase has no endDate', () => { + const instance = stubInstance( + { + currentPhaseId: 'phase-a', + phases: [ + { + phaseId: 'phase-a', + rules: { advancement: { method: 'date' } }, + // no endDate + }, + { + phaseId: 'phase-b', + }, + ], + }, + 'inst-123', + ); + + expect(() => buildExpectedTransitions(instance)).toThrow( + 'Phase "phase-a" must have an end date for date-based advancement (instance: inst-123)', + ); + }); +}); diff --git a/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts b/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts index 2c0602eae..d852f197e 100644 --- a/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts +++ b/services/api/src/routers/decision/instances/createInstanceFromTemplate.test.ts @@ -102,7 +102,7 @@ describe.concurrent('createInstanceFromTemplate', () => { expect(instanceData.currentPhaseId).toBe('submission'); }); - it('should create transitions for phases with date-based advancement', async ({ + it('should NOT create transitions for DRAFT instances (transitions are created on publish)', async ({ task, onTestFinished, }) => { @@ -152,37 +152,18 @@ describe.concurrent('createInstanceFromTemplate', () => { testData.trackProfileForCleanup(result.id); expect(result.processInstance.id).toBeDefined(); + expect(result.processInstance.status).toBe('draft'); - // Verify transitions were created + // Verify NO transitions were created for DRAFT instance + // Transitions are only created when the instance is published const transitions = await db._query.decisionProcessTransitions.findMany({ where: eq( decisionProcessTransitions.processInstanceId, result.processInstance.id, ), - orderBy: (transitions, { asc }) => [asc(transitions.scheduledDate)], }); - // The simple template has 4 phases with date-based advancement - // Each phase that has a date-based advancement gets a transition created - expect(transitions.length).toBeGreaterThanOrEqual(3); - - // Verify transition details - check that key transitions exist - const submissionToReview = transitions.find( - (t) => t.fromStateId === 'submission' && t.toStateId === 'review', - ); - const reviewToVoting = transitions.find( - (t) => t.fromStateId === 'review' && t.toStateId === 'voting', - ); - const votingToResults = transitions.find( - (t) => t.fromStateId === 'voting' && t.toStateId === 'results', - ); - - expect(submissionToReview).toBeDefined(); - expect(submissionToReview!.completedAt).toBeNull(); - - expect(reviewToVoting).toBeDefined(); - - expect(votingToResults).toBeDefined(); + expect(transitions.length).toBe(0); }); it('should accept phase settings', async ({ task, onTestFinished }) => { diff --git a/services/api/src/routers/decision/instances/transitionEngine.test.ts b/services/api/src/routers/decision/instances/transitionEngine.test.ts new file mode 100644 index 000000000..e38048a6d --- /dev/null +++ b/services/api/src/routers/decision/instances/transitionEngine.test.ts @@ -0,0 +1,341 @@ +import { TransitionEngine, simpleVoting } from '@op/common'; +import { db, eq } from '@op/db/client'; +import { + ProcessStatus, + decisionProcesses, + processInstances, + stateTransitionHistory, + users, +} from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +import { appRouter } from '../..'; +import { TestDecisionsDataManager } from '../../../test/helpers/TestDecisionsDataManager'; +import { + createIsolatedSession, + createTestContextWithSession, +} from '../../../test/supabase-utils'; +import { createCallerFactory } from '../../../trpcFactory'; + +const createCaller = createCallerFactory(appRouter); + +async function createAuthenticatedCaller(email: string) { + const { session } = await createIsolatedSession(email); + return createCaller(await createTestContextWithSession(session)); +} + +/** + * Legacy process schema format with explicit states and transitions arrays. + * The TransitionEngine reads `processSchema.transitions` from the process record. + */ +const legacyProcessSchema = { + name: 'Legacy Process', + description: 'A process schema with explicit transitions for testing', + states: [ + { id: 'submission', name: 'Submission', type: 'initial' }, + { id: 'review', name: 'Review', type: 'intermediate' }, + { id: 'voting', name: 'Voting', type: 'intermediate' }, + { id: 'results', name: 'Results', type: 'final' }, + ], + transitions: [ + { + id: 'submission-to-review', + name: 'Submit to Review', + from: 'submission', + to: 'review', + rules: { type: 'manual' }, + }, + { + id: 'review-to-voting', + name: 'Review to Voting', + from: 'review', + to: 'voting', + rules: { type: 'manual' }, + }, + { + id: 'voting-to-results', + name: 'Voting to Results', + from: 'voting', + to: 'results', + rules: { type: 'manual' }, + }, + ], + initialState: 'submission', + decisionDefinition: { type: 'object' }, + proposalTemplate: { type: 'object' }, +}; + +/** + * Creates a process template with legacy ProcessSchema format (with transitions array) + * and an instance with proper instanceData for TransitionEngine tests. + */ +async function createLegacyProcessWithInstance( + testData: TestDecisionsDataManager, + taskId: string, + { + currentPhaseId = 'submission', + processSchema = legacyProcessSchema, + }: { + currentPhaseId?: string; + processSchema?: Record; + } = {}, +) { + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + const [userRecord] = await db + .select() + .from(users) + .where(eq(users.email, setup.userEmail)); + + if (!userRecord?.profileId) { + throw new Error('Test user must have a profileId'); + } + + // Create process with legacy schema format + const [process] = await db + .insert(decisionProcesses) + .values({ + name: `Legacy Process ${taskId}`, + description: 'Test process for TransitionEngine', + processSchema, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + const caller = await createAuthenticatedCaller(setup.userEmail); + + // Create a simpleVoting template for createInstanceFromTemplate, then re-link to legacy process + const [simpleTemplate] = await db + .insert(decisionProcesses) + .values({ + name: `Simple Template for Engine ${taskId}`, + description: simpleVoting.description, + processSchema: simpleVoting, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + const instanceResult = await caller.decision.createInstanceFromTemplate({ + templateId: simpleTemplate!.id, + name: `Engine Test ${taskId}`, + }); + + testData.trackProfileForCleanup(instanceResult.id); + + const instanceId = instanceResult.processInstance.id; + + // Re-link the instance to our legacy process and set the desired current state + await db + .update(processInstances) + .set({ + processId: process!.id, + currentStateId: currentPhaseId, + status: ProcessStatus.PUBLISHED, + instanceData: { + currentPhaseId, + phases: [ + { phaseId: 'submission', name: 'Submission' }, + { phaseId: 'review', name: 'Review' }, + { phaseId: 'voting', name: 'Voting' }, + { phaseId: 'results', name: 'Results' }, + ], + }, + }) + .where(eq(processInstances.id, instanceId)); + + return { + instanceId, + processId: process!.id, + user: setup.user, + userEmail: setup.userEmail, + caller, + }; +} + +describe.concurrent('TransitionEngine', () => { + it('should return available transitions from current state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + user, + }); + + expect(result.canTransition).toBe(true); + expect(result.availableTransitions.length).toBeGreaterThanOrEqual(1); + + const reviewTransition = result.availableTransitions.find( + (t) => t.toStateId === 'review', + ); + expect(reviewTransition).toBeDefined(); + expect(reviewTransition!.transitionName).toBe('Submit to Review'); + expect(reviewTransition!.canExecute).toBe(true); + }); + + it('should return no transitions when at final state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'results' }, + ); + + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + user, + }); + + expect(result.canTransition).toBe(false); + expect(result.availableTransitions.length).toBe(0); + }); + + it('should filter transitions by toStateId', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + // Check with a specific toStateId + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'review', + user, + }); + + expect(result.availableTransitions.length).toBe(1); + expect(result.availableTransitions[0]!.toStateId).toBe('review'); + + // Check with a toStateId that doesn't match any transition from current state + const noResult = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'results', + user, + }); + + expect(noResult.canTransition).toBe(false); + expect(noResult.availableTransitions.length).toBe(0); + }); + + it('should execute a valid manual transition', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { currentPhaseId: 'submission' }, + ); + + const result = await TransitionEngine.executeTransition({ + data: { + instanceId, + toStateId: 'review', + }, + user, + }); + + expect(result).toBeDefined(); + expect(result!.currentStateId).toBe('review'); + + // Verify state transition history was created + const history = await db._query.stateTransitionHistory.findMany({ + where: eq(stateTransitionHistory.processInstanceId, instanceId), + }); + + expect(history.length).toBe(1); + expect(history[0]!.fromStateId).toBe('submission'); + expect(history[0]!.toStateId).toBe('review'); + }); + + it('should reject transition when conditions are not met', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a process schema with a transition that has a proposalCount condition + const schemaWithConditions = { + ...legacyProcessSchema, + transitions: [ + { + id: 'submission-to-review', + name: 'Submit to Review', + from: 'submission', + to: 'review', + rules: { + type: 'manual', + conditions: [ + { + type: 'proposalCount', + operator: 'greaterThan', + value: 0, + }, + ], + requireAll: true, + }, + }, + ...legacyProcessSchema.transitions.slice(1), + ], + }; + + const { instanceId, user } = await createLegacyProcessWithInstance( + testData, + task.id, + { + currentPhaseId: 'submission', + processSchema: schemaWithConditions, + }, + ); + + // No proposals exist, so the proposalCount > 0 condition should fail + const result = await TransitionEngine.checkAvailableTransitions({ + instanceId, + toStateId: 'review', + user, + }); + + expect(result.availableTransitions.length).toBe(1); + expect(result.availableTransitions[0]!.canExecute).toBe(false); + expect(result.availableTransitions[0]!.failedRules.length).toBeGreaterThan( + 0, + ); + expect(result.canTransition).toBe(false); + }); + + it('should throw NotFoundError for non-existent instance', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + await expect( + TransitionEngine.checkAvailableTransitions({ + instanceId: '00000000-0000-0000-0000-000000000000', + user: setup.user, + }), + ).rejects.toThrow(/not found/i); + }); +}); diff --git a/services/api/src/routers/decision/instances/transitionMonitor.test.ts b/services/api/src/routers/decision/instances/transitionMonitor.test.ts new file mode 100644 index 000000000..c68172139 --- /dev/null +++ b/services/api/src/routers/decision/instances/transitionMonitor.test.ts @@ -0,0 +1,598 @@ +import { + type DecisionInstanceData, + processDecisionsTransitions, + simpleVoting, +} from '@op/common'; +import { db, eq } from '@op/db/client'; +import { + ProcessStatus, + decisionProcessTransitions, + decisionProcesses, + processInstances, + users, +} from '@op/db/schema'; +import { describe, expect, it } from 'vitest'; + +import { appRouter } from '../..'; +import { TestDecisionsDataManager } from '../../../test/helpers/TestDecisionsDataManager'; +import { + createIsolatedSession, + createTestContextWithSession, +} from '../../../test/supabase-utils'; +import { createCallerFactory } from '../../../trpcFactory'; + +const createCaller = createCallerFactory(appRouter); + +async function createAuthenticatedCaller(email: string) { + const { session } = await createIsolatedSession(email); + return createCaller(await createTestContextWithSession(session)); +} + +/** + * Helper to create a template using the simpleVoting schema (4 phases, date-based advancement). + * Returns the template ID and user email. + */ +async function createSimpleTemplate( + testData: TestDecisionsDataManager, + taskId: string, +) { + const setup = await testData.createDecisionSetup({ instanceCount: 0 }); + + const [userRecord] = await db + .select() + .from(users) + .where(eq(users.email, setup.userEmail)); + + if (!userRecord?.profileId) { + throw new Error('Test user must have a profileId'); + } + + const [template] = await db + .insert(decisionProcesses) + .values({ + name: `Simple Template ${taskId}`, + description: simpleVoting.description, + processSchema: simpleVoting, + createdByProfileId: userRecord.profileId, + }) + .returning(); + + return { templateId: template!.id, userEmail: setup.userEmail }; +} + +/** + * Helper to create an instance from a template, publish it, and make transitions due. + * Returns everything needed for transition monitor tests. + */ +async function createPublishedInstanceWithDueTransitions( + testData: TestDecisionsDataManager, + taskId: string, + { + makeDue = true, + phaseDateOffsets, + }: { + makeDue?: boolean; + phaseDateOffsets?: { startOffsetMs: number; endOffsetMs: number }[]; + } = {}, +) { + const { templateId, userEmail } = await createSimpleTemplate( + testData, + taskId, + ); + const caller = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + + // Default: phases spaced 7 days apart in the future + const defaultOffsets = [ + { startOffsetMs: 0, endOffsetMs: 7 * 24 * 60 * 60 * 1000 }, + { + startOffsetMs: 7 * 24 * 60 * 60 * 1000, + endOffsetMs: 14 * 24 * 60 * 60 * 1000, + }, + { + startOffsetMs: 14 * 24 * 60 * 60 * 1000, + endOffsetMs: 21 * 24 * 60 * 60 * 1000, + }, + { startOffsetMs: 21 * 24 * 60 * 60 * 1000, endOffsetMs: 0 }, + ]; + + const offsets = phaseDateOffsets ?? defaultOffsets; + + if (offsets.length !== 4) { + throw new Error( + `phaseDateOffsets must have exactly 4 entries (one per phase), got ${offsets.length}`, + ); + } + + const phases = [ + { + phaseId: 'submission', + startDate: new Date( + now.getTime() + offsets[0]!.startOffsetMs, + ).toISOString(), + endDate: new Date(now.getTime() + offsets[0]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'review', + startDate: new Date( + now.getTime() + offsets[1]!.startOffsetMs, + ).toISOString(), + endDate: new Date(now.getTime() + offsets[1]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date( + now.getTime() + offsets[2]!.startOffsetMs, + ).toISOString(), + endDate: new Date(now.getTime() + offsets[2]!.endOffsetMs).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date( + now.getTime() + offsets[3]!.startOffsetMs, + ).toISOString(), + }, + ]; + + // Create instance + const result = await caller.decision.createInstanceFromTemplate({ + templateId, + name: `Monitor Test ${taskId}`, + phases, + }); + + testData.trackProfileForCleanup(result.id); + + const instanceId = result.processInstance.id; + + // Publish the instance (this creates transitions) + await caller.decision.updateDecisionInstance({ + instanceId, + status: ProcessStatus.PUBLISHED, + }); + + if (makeDue) { + // Make all transitions due by setting scheduledDate to past times. + // Stagger them to preserve correct ordering (monitor orders by scheduledDate). + const fetchedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + // Sort by the original scheduled date to maintain creation order + fetchedTransitions.sort( + (a, b) => + new Date(a.scheduledDate).getTime() - + new Date(b.scheduledDate).getTime(), + ); + + for (let i = 0; i < fetchedTransitions.length; i++) { + const staggeredPast = new Date( + now.getTime() - (fetchedTransitions.length - i) * 60 * 60 * 1000, + ).toISOString(); + await db + .update(decisionProcessTransitions) + .set({ scheduledDate: staggeredPast }) + .where(eq(decisionProcessTransitions.id, fetchedTransitions[i]!.id)); + } + } + + // Get the transitions + const transitions = await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + return { + instanceId, + transitions, + userEmail, + caller, + templateId, + }; +} + +// Sequential execution: processDecisionsTransitions is a global function that processes +// ALL due transitions, so concurrent tests would race to process each other's transitions. +describe('processDecisionsTransitions', () => { + it('should process a due transition and update instance state', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + ); + + // Capture updatedAt before processing + const beforeInstance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + const updatedAtBefore = beforeInstance!.updatedAt; + + const result = await processDecisionsTransitions(); + + expect(result.processed).toBeGreaterThanOrEqual(1); + expect(result.failed).toBe(0); + + // Verify instance state was updated + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + expect(instance).toBeDefined(); + + // All 3 transitions (submission→review, review→voting, voting→results) were due + // so the instance should have advanced to the final state + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // Verify updatedAt was set by the monitor + expect(instance!.updatedAt).toBeDefined(); + expect(new Date(instance!.updatedAt!).getTime()).toBeGreaterThan( + new Date(updatedAtBefore!).getTime(), + ); + + // Verify transitions are marked completed + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + for (const transition of completedTransitions) { + expect(transition.completedAt).not.toBeNull(); + } + }); + + it('should NOT process transitions for DRAFT instances', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { templateId, userEmail } = await createSimpleTemplate( + testData, + task.id, + ); + const caller = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + const pastDate = new Date(now.getTime() - 24 * 60 * 60 * 1000); // 1 day ago + + // Create instance with past dates (but don't publish) + const result = await caller.decision.createInstanceFromTemplate({ + templateId, + name: `Draft Monitor Test ${task.id}`, + phases: [ + { + phaseId: 'submission', + startDate: new Date( + pastDate.getTime() - 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: pastDate.toISOString(), + }, + { + phaseId: 'review', + startDate: pastDate.toISOString(), + endDate: new Date( + pastDate.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date( + pastDate.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + pastDate.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date( + pastDate.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + ], + }); + + testData.trackProfileForCleanup(result.id); + + // Draft instances don't have transitions created at all, + // but let's manually insert one to be sure the monitor skips it + await db.insert(decisionProcessTransitions).values({ + processInstanceId: result.processInstance.id, + fromStateId: 'submission', + toStateId: 'review', + scheduledDate: pastDate.toISOString(), + }); + + await processDecisionsTransitions(); + + // The transition should NOT have been processed because the instance is DRAFT + const transitions = await db._query.decisionProcessTransitions.findMany({ + where: eq( + decisionProcessTransitions.processInstanceId, + result.processInstance.id, + ), + }); + + for (const transition of transitions) { + expect(transition.completedAt).toBeNull(); + } + }); + + it('should NOT process future-dated transitions', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a published instance with future transitions (don't make them due) + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + { + makeDue: false, + }, + ); + + await processDecisionsTransitions(); + + // Future transitions should not be processed + const refreshedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + for (const transition of refreshedTransitions) { + expect(transition.completedAt).toBeNull(); + } + + // Instance should still be in the initial state + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('submission'); + }); + + it('should process multiple due transitions sequentially for same instance', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, transitions } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // All 3 transitions should be due (submission→review, review→voting, voting→results) + expect(transitions.length).toBe(3); + + const result = await processDecisionsTransitions(); + + // All 3 should have been processed + expect(result.processed).toBeGreaterThanOrEqual(3); + expect(result.failed).toBe(0); + + // Instance should be at the final state (results) + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // All transitions should be completed + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + expect(completedTransitions.every((t) => t.completedAt !== null)).toBe( + true, + ); + }); + + it('should skip already-completed transitions', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId, transitions } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // Manually mark the first transition as completed + const firstTransition = transitions.find( + (t) => t.fromStateId === 'submission', + ); + expect(firstTransition).toBeDefined(); + + const completedTime = new Date().toISOString(); + await db + .update(decisionProcessTransitions) + .set({ completedAt: completedTime }) + .where(eq(decisionProcessTransitions.id, firstTransition!.id)); + + // Also update instance state to match the completed transition + await db + .update(processInstances) + .set({ currentStateId: 'review' }) + .where(eq(processInstances.id, instanceId)); + + const result = await processDecisionsTransitions(); + + // The first transition should not be re-processed + const refreshedFirst = await db._query.decisionProcessTransitions.findFirst( + { + where: eq(decisionProcessTransitions.id, firstTransition!.id), + }, + ); + + // The first transition should still have a completedAt set (it was pre-completed) + expect(refreshedFirst!.completedAt).not.toBeNull(); + + // The remaining 2 transitions should have been processed + // (review→voting, voting→results) + expect(result.processed).toBeGreaterThanOrEqual(2); + }); + + it('should stop processing instance on error and continue others', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + // Create a valid published instance with due transitions + const { instanceId: goodInstanceId } = + await createPublishedInstanceWithDueTransitions(testData, task.id); + + // Create another instance and corrupt its data + const { templateId, userEmail } = await createSimpleTemplate( + testData, + `${task.id}-bad`, + ); + const caller2 = await createAuthenticatedCaller(userEmail); + + const now = new Date(); + const badResult = await caller2.decision.createInstanceFromTemplate({ + templateId, + name: `Bad Monitor Test ${task.id}`, + phases: [ + { + phaseId: 'submission', + startDate: now.toISOString(), + endDate: new Date( + now.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'review', + startDate: new Date( + now.getTime() + 7 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + now.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'voting', + startDate: new Date( + now.getTime() + 14 * 24 * 60 * 60 * 1000, + ).toISOString(), + endDate: new Date( + now.getTime() + 21 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + { + phaseId: 'results', + startDate: new Date( + now.getTime() + 21 * 24 * 60 * 60 * 1000, + ).toISOString(), + }, + ], + }); + + testData.trackProfileForCleanup(badResult.id); + const badInstanceId = badResult.processInstance.id; + + // Publish the bad instance + await caller2.decision.updateDecisionInstance({ + instanceId: badInstanceId, + status: ProcessStatus.PUBLISHED, + }); + + // Make its transitions due with staggered past dates + const badTransitions = await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, badInstanceId), + }); + + badTransitions.sort( + (a, b) => + new Date(a.scheduledDate).getTime() - + new Date(b.scheduledDate).getTime(), + ); + + for (let i = 0; i < badTransitions.length; i++) { + const staggeredPast = new Date( + now.getTime() - (badTransitions.length - i) * 60 * 60 * 1000, + ).toISOString(); + await db + .update(decisionProcessTransitions) + .set({ scheduledDate: staggeredPast }) + .where(eq(decisionProcessTransitions.id, badTransitions[i]!.id)); + } + + // Corrupt the bad instance's data by setting instanceData to have empty phases + await db + .update(processInstances) + .set({ + instanceData: { currentPhaseId: 'submission', phases: [] }, + }) + .where(eq(processInstances.id, badInstanceId)); + + const monitorResult = await processDecisionsTransitions(); + + // The bad instance should have failed, but the good instance should succeed + expect(monitorResult.failed).toBeGreaterThanOrEqual(1); + expect(monitorResult.processed).toBeGreaterThanOrEqual(1); + expect(monitorResult.errors.length).toBeGreaterThanOrEqual(1); + + // Verify the good instance was processed + const goodInstance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, goodInstanceId), + }); + expect(goodInstance!.currentStateId).toBe('results'); + }); + + it('should handle concurrent workers without double-processing', async ({ + task, + onTestFinished, + }) => { + const testData = new TestDecisionsDataManager(task.id, onTestFinished); + + const { instanceId } = await createPublishedInstanceWithDueTransitions( + testData, + task.id, + ); + + // Run two monitor invocations concurrently (simulates two Inngest workers) + const [result1, result2] = await Promise.all([ + processDecisionsTransitions(), + processDecisionsTransitions(), + ]); + + // Between the two runs, exactly 3 transitions should have been processed total + const totalProcessed = result1.processed + result2.processed; + expect(totalProcessed).toBeGreaterThanOrEqual(3); + + // Neither run should have failures + expect(result1.failed).toBe(0); + expect(result2.failed).toBe(0); + + // Instance should be at the final state + const instance = await db._query.processInstances.findFirst({ + where: eq(processInstances.id, instanceId), + }); + const instanceData = instance!.instanceData as DecisionInstanceData; + expect(instanceData.currentPhaseId).toBe('results'); + expect(instance!.currentStateId).toBe('results'); + + // Each transition should have completedAt set exactly once + const completedTransitions = + await db._query.decisionProcessTransitions.findMany({ + where: eq(decisionProcessTransitions.processInstanceId, instanceId), + }); + + expect(completedTransitions).toHaveLength(3); + for (const transition of completedTransitions) { + expect(transition.completedAt).not.toBeNull(); + } + }); +});