Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/database/src/dbTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ export type Database = {
Row: {
failure_count: number | null
id: number
last_success_start: string | null
last_task_end: string | null
last_task_start: string | null
status: Database["public"]["Enums"]["task_status"] | null
Expand All @@ -664,6 +665,7 @@ export type Database = {
Insert: {
failure_count?: number | null
id?: number
last_success_start?: string | null
last_task_end?: string | null
last_task_start?: string | null
status?: Database["public"]["Enums"]["task_status"] | null
Expand All @@ -676,6 +678,7 @@ export type Database = {
Update: {
failure_count?: number | null
id?: number
last_success_start?: string | null
last_task_end?: string | null
last_task_start?: string | null
status?: Database["public"]["Enums"]["task_status"] | null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@

alter table "public"."sync_info" add column "last_success_start" timestamp with time zone;

alter table "public"."sync_info" alter column "last_task_start" set not null;

update sync_info set last_success_start = last_task_start where last_task_end is not null;

CREATE OR REPLACE FUNCTION public.end_sync_task(s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status)
RETURNS void
LANGUAGE plpgsql
SET search_path TO ''
AS $function$
DECLARE t_id INTEGER;
DECLARE t_worker varchar;
DECLARE t_status public.task_status;
DECLARE t_failure_count SMALLINT;
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
BEGIN
SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
ASSERT s_status > 'active';
ASSERT t_worker = s_worker, 'Wrong worker';
ASSERT s_status >= t_status, 'do not go back in status';
IF s_status = 'complete' THEN
t_last_task_end := now();
t_last_success_start := t_last_task_start;
t_failure_count := 0;
ELSE
IF t_status != s_status THEN
t_failure_count := t_failure_count + 1;
END IF;
END IF;

UPDATE public.sync_info
SET status = s_status,
task_times_out_at=null,
last_task_end=t_last_task_end,
last_success_start=t_last_success_start,
failure_count=t_failure_count
WHERE id=t_id;
END;
$function$
;


CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval)
RETURNS timestamp with time zone
LANGUAGE plpgsql
SET search_path TO ''
AS $function$
DECLARE s_id INTEGER;
DECLARE start_time TIMESTAMP WITH TIME ZONE := now();
DECLARE t_status public.task_status;
DECLARE t_failure_count SMALLINT;
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
DECLARE result TIMESTAMP WITH TIME ZONE;
BEGIN
ASSERT timeout * 2 < task_interval;
ASSERT timeout >= '1s'::interval;
ASSERT task_interval >= '5s'::interval;
INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at)
VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout)
ON CONFLICT (sync_target, sync_function) DO NOTHING
RETURNING id INTO s_id;
IF s_id IS NOT NULL THEN
-- totally new_row, no previous success.
RETURN NULL;
END IF;
-- now we know it pre-existed. Maybe already active.
SELECT id INTO STRICT s_id
FROM public.sync_info
WHERE sync_target = s_target AND sync_function = s_function
FOR UPDATE;
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start
FROM public.sync_info
WHERE id = s_id;

IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN
t_status := 'timeout';
t_failure_count := t_failure_count + 1;
END IF;
-- basic backoff
task_interval := task_interval * (1+t_failure_count);
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
-- we are ready to take on the task
result := t_last_success_start;
UPDATE public.sync_info
SET worker=s_worker,
status='active',
task_times_out_at = now() + timeout,
last_task_start = start_time,
failure_count=t_failure_count,
last_task_end = NULL
WHERE id=s_id;
ELSE
-- the task has been tried recently enough
IF t_status = 'timeout' THEN
UPDATE public.sync_info
SET status=t_status, failure_count=t_failure_count
WHERE id=s_id;
END IF;
result := coalesce(t_last_task_end, t_last_task_start) + task_interval;
END IF;

RETURN result;
END;
$function$
;
32 changes: 15 additions & 17 deletions packages/database/supabase/schemas/sync.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ CREATE TABLE IF NOT EXISTS public.sync_info (
status public.task_status DEFAULT 'active'::public.task_status,
worker character varying(100) NOT NULL,
failure_count smallint DEFAULT 0,
last_task_start timestamp with time zone,
last_task_start timestamp with time zone NOT NULL,
last_task_end timestamp with time zone,
task_times_out_at timestamp with time zone
task_times_out_at timestamp with time zone,
last_success_start timestamp with time zone
);

ALTER TABLE public.sync_info OWNER TO "postgres";
Expand Down Expand Up @@ -58,16 +59,19 @@ DECLARE t_id INTEGER;
DECLARE t_worker varchar;
DECLARE t_status public.task_status;
DECLARE t_failure_count SMALLINT;
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
BEGIN
SELECT id, worker, status, failure_count, last_task_end
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end
SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
ASSERT s_status > 'active';
ASSERT t_worker = s_worker, 'Wrong worker';
ASSERT s_status >= t_status, 'do not go back in status';
IF s_status = 'complete' THEN
t_last_task_end := now();
t_last_success_start := t_last_task_start;
t_failure_count := 0;
ELSE
IF t_status != s_status THEN
Expand All @@ -79,6 +83,7 @@ BEGIN
SET status = s_status,
task_times_out_at=null,
last_task_end=t_last_task_end,
last_success_start=t_last_success_start,
failure_count=t_failure_count
WHERE id=t_id;
END;
Expand Down Expand Up @@ -109,6 +114,7 @@ DECLARE t_failure_count SMALLINT;
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
DECLARE result TIMESTAMP WITH TIME ZONE;
BEGIN
ASSERT timeout * 2 < task_interval;
Expand All @@ -119,21 +125,16 @@ BEGIN
ON CONFLICT (sync_target, sync_function) DO NOTHING
RETURNING id INTO s_id;
IF s_id IS NOT NULL THEN
-- totally new_row, I'm on the task
-- return last time it was run successfully
SELECT max(last_task_start) INTO result FROM public.sync_info
WHERE sync_target = s_target
AND sync_function = s_function
AND status = 'complete';
RETURN result;
-- totally new_row, no previous success.
RETURN NULL;
END IF;
-- now we know it pre-existed. Maybe already active.
SELECT id INTO STRICT s_id
FROM public.sync_info
WHERE sync_target = s_target AND sync_function = s_function
FOR UPDATE;
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start
FROM public.sync_info
WHERE id = s_id;

Expand All @@ -145,10 +146,7 @@ BEGIN
task_interval := task_interval * (1+t_failure_count);
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
-- we are ready to take on the task
SELECT max(last_task_start) INTO result FROM public.sync_info
WHERE sync_target = s_target
AND sync_function = s_function
AND status = 'complete';
result := t_last_success_start;
UPDATE public.sync_info
SET worker=s_worker,
status='active',
Expand Down