From 781f0b4a723e62d7b09605fc25bf67e88e242ef7 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Tue, 16 Dec 2025 17:37:04 -0500 Subject: [PATCH 1/2] eng-1121-endSyncTask erro --- packages/database/src/dbTypes.ts | 3 + ...20251216222945_sync_last_success_start.sql | 120 ++++++++++++++++++ packages/database/supabase/schemas/sync.sql | 23 ++-- 3 files changed, 136 insertions(+), 10 deletions(-) create mode 100644 packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 0ae2f4258..132a482cf 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -652,6 +652,7 @@ export type Database = { Row: { failure_count: number | null id: number + last_success_start: string | null last_task_end: string | null last_task_start: string | null status: Database["public"]["Enums"]["task_status"] | null @@ -664,6 +665,7 @@ export type Database = { Insert: { failure_count?: number | null id?: number + last_success_start?: string | null last_task_end?: string | null last_task_start?: string | null status?: Database["public"]["Enums"]["task_status"] | null @@ -676,6 +678,7 @@ export type Database = { Update: { failure_count?: number | null id?: number + last_success_start?: string | null last_task_end?: string | null last_task_start?: string | null status?: Database["public"]["Enums"]["task_status"] | null diff --git a/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql b/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql new file mode 100644 index 000000000..3a675069e --- /dev/null +++ b/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql @@ -0,0 +1,120 @@ + +alter table "public"."sync_info" add column "last_success_start" timestamp with time zone; + +alter table "public"."sync_info" alter column "last_task_start" set not null; + +update sync_info set last_success_start = last_task_start where last_task_end is not null; + +CREATE OR REPLACE FUNCTION public.end_sync_task(s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status) + RETURNS void + LANGUAGE plpgsql + SET search_path TO '' +AS $function$ +DECLARE t_id INTEGER; +DECLARE t_worker varchar; +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +BEGIN + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + ASSERT s_status > 'active'; + ASSERT t_worker = s_worker, 'Wrong worker'; + ASSERT s_status >= t_status, 'do not go back in status'; + IF s_status = 'complete' THEN + t_last_task_end := now(); + t_last_success_start := t_last_task_start; + t_failure_count := 0; + ELSE + IF t_status != s_status THEN + t_failure_count := t_failure_count + 1; + END IF; + END IF; + + UPDATE public.sync_info + SET status = s_status, + task_times_out_at=null, + last_task_end=t_last_task_end, + last_success_start=t_last_success_start, + failure_count=t_failure_count + WHERE id=t_id; +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval) + RETURNS timestamp with time zone + LANGUAGE plpgsql + SET search_path TO '' +AS $function$ +DECLARE s_id INTEGER; +DECLARE start_time TIMESTAMP WITH TIME ZONE := now(); +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE result TIMESTAMP WITH TIME ZONE; +BEGIN + ASSERT timeout * 2 < task_interval; + ASSERT timeout >= '1s'::interval; + ASSERT task_interval >= '5s'::interval; + INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at) + VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout) + ON CONFLICT (sync_target, sync_function) DO NOTHING + RETURNING id INTO s_id; + IF s_id IS NOT NULL THEN + -- totally new_row, I'm on the task + -- return last time it was run successfully + SELECT max(last_task_start) INTO result FROM public.sync_info + WHERE sync_target = s_target + AND sync_function = s_function + AND status = 'complete'; + RETURN result; + END IF; + -- now we know it pre-existed. Maybe already active. + SELECT id INTO STRICT s_id + FROM public.sync_info + WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start + FROM public.sync_info + WHERE id = s_id; + + IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN + t_status := 'timeout'; + t_failure_count := t_failure_count + 1; + END IF; + -- basic backoff + task_interval := task_interval * (1+t_failure_count); + IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN + -- we are ready to take on the task + result := t_last_success_start; + UPDATE public.sync_info + SET worker=s_worker, + status='active', + task_times_out_at = now() + timeout, + last_task_start = start_time, + failure_count=t_failure_count, + last_task_end = NULL + WHERE id=s_id; + ELSE + -- the task has been tried recently enough + IF t_status = 'timeout' THEN + UPDATE public.sync_info + SET status=t_status, failure_count=t_failure_count + WHERE id=s_id; + END IF; + result := coalesce(t_last_task_end, t_last_task_start) + task_interval; + END IF; + + RETURN result; +END; +$function$ +; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 8457b5562..1b26a09ef 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -15,9 +15,10 @@ CREATE TABLE IF NOT EXISTS public.sync_info ( status public.task_status DEFAULT 'active'::public.task_status, worker character varying(100) NOT NULL, failure_count smallint DEFAULT 0, - last_task_start timestamp with time zone, + last_task_start timestamp with time zone NOT NULL, last_task_end timestamp with time zone, - task_times_out_at timestamp with time zone + task_times_out_at timestamp with time zone, + last_success_start timestamp with time zone ); ALTER TABLE public.sync_info OWNER TO "postgres"; @@ -58,16 +59,19 @@ DECLARE t_id INTEGER; DECLARE t_worker varchar; DECLARE t_status public.task_status; DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; BEGIN - SELECT id, worker, status, failure_count, last_task_end - INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; ASSERT t_worker = s_worker, 'Wrong worker'; ASSERT s_status >= t_status, 'do not go back in status'; IF s_status = 'complete' THEN t_last_task_end := now(); + t_last_success_start := t_last_task_start; t_failure_count := 0; ELSE IF t_status != s_status THEN @@ -79,6 +83,7 @@ BEGIN SET status = s_status, task_times_out_at=null, last_task_end=t_last_task_end, + last_success_start=t_last_success_start, failure_count=t_failure_count WHERE id=t_id; END; @@ -109,6 +114,7 @@ DECLARE t_failure_count SMALLINT; DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; DECLARE result TIMESTAMP WITH TIME ZONE; BEGIN ASSERT timeout * 2 < task_interval; @@ -132,8 +138,8 @@ BEGIN FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function FOR UPDATE; - SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at - INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start FROM public.sync_info WHERE id = s_id; @@ -145,10 +151,7 @@ BEGIN task_interval := task_interval * (1+t_failure_count); IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN -- we are ready to take on the task - SELECT max(last_task_start) INTO result FROM public.sync_info - WHERE sync_target = s_target - AND sync_function = s_function - AND status = 'complete'; + result := t_last_success_start; UPDATE public.sync_info SET worker=s_worker, status='active', From b6d316f9d5ca30349ca297c1d72280a97f4becdd Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Tue, 16 Dec 2025 20:14:24 -0500 Subject: [PATCH 2/2] simplify a path --- .../20251216222945_sync_last_success_start.sql | 9 ++------- packages/database/supabase/schemas/sync.sql | 9 ++------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql b/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql index 3a675069e..a0264a45a 100644 --- a/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql +++ b/packages/database/supabase/migrations/20251216222945_sync_last_success_start.sql @@ -69,13 +69,8 @@ BEGIN ON CONFLICT (sync_target, sync_function) DO NOTHING RETURNING id INTO s_id; IF s_id IS NOT NULL THEN - -- totally new_row, I'm on the task - -- return last time it was run successfully - SELECT max(last_task_start) INTO result FROM public.sync_info - WHERE sync_target = s_target - AND sync_function = s_function - AND status = 'complete'; - RETURN result; + -- totally new_row, no previous success. + RETURN NULL; END IF; -- now we know it pre-existed. Maybe already active. SELECT id INTO STRICT s_id diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 1b26a09ef..6ae25b7f3 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -125,13 +125,8 @@ BEGIN ON CONFLICT (sync_target, sync_function) DO NOTHING RETURNING id INTO s_id; IF s_id IS NOT NULL THEN - -- totally new_row, I'm on the task - -- return last time it was run successfully - SELECT max(last_task_start) INTO result FROM public.sync_info - WHERE sync_target = s_target - AND sync_function = s_function - AND status = 'complete'; - RETURN result; + -- totally new_row, no previous success. + RETURN NULL; END IF; -- now we know it pre-existed. Maybe already active. SELECT id INTO STRICT s_id