Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/bun-postgres-runtime/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "@yieldstar/bun-postgres-runtime",
"version": "0.4.4",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "bun build src/index.ts --outfile dist/index.js --target bun --packages=external"
},
"dependencies": {
"@yieldstar/core": "workspace:*",
"pino": "^9.6.0"
},
"devDependencies": {
"@types/bun": "^1.2.4",
"yieldstar": "workspace:*"
}
}
69 changes: 69 additions & 0 deletions packages/bun-postgres-runtime/src/dao/step-response-dao.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { SQL } from "bun";

interface StepResponseRow {
execution_id: string;
step_key: string;
step_attempt: number;
step_done: boolean;
step_response: string;
}

export class StepResponsesDao {
private ready: Promise<void>;
constructor(private sql: SQL) {
this.ready = this.setupDb();
}

private async setupDb() {
await this.sql`
CREATE TABLE IF NOT EXISTS step_responses (
execution_id TEXT NOT NULL,
step_key TEXT NOT NULL,
step_attempt INTEGER NOT NULL,
step_done BOOLEAN NOT NULL,
step_response JSONB,
PRIMARY KEY (execution_id, step_key, step_attempt)
)
`;
await this.sql`
CREATE INDEX IF NOT EXISTS idx_execution_step_attempt
ON step_responses(execution_id, step_key, step_attempt DESC)
`;
}

async getAllSteps() {
await this.ready;
return this.sql`SELECT * FROM step_responses`;
}

async deleteAll() {
await this.ready;
await this.sql`DELETE FROM step_responses`;
}

async getLatestStepResponse(executionId: string, stepKey: string) {
await this.ready;
const rows = await this.sql`
SELECT * FROM step_responses
WHERE execution_id = ${executionId}
AND step_key = ${stepKey}
ORDER BY step_attempt DESC LIMIT 1
` as StepResponseRow[];

return rows[0];
}

async insertStepResponse(
executionId: string,
stepKey: string,
stepAttempt: number,
stepDone: boolean,
stepResponseJson: string
) {
await this.ready;
await this.sql`
INSERT INTO step_responses (execution_id, step_key, step_attempt, step_done, step_response)
VALUES (${executionId}, ${stepKey}, ${stepAttempt}, ${stepDone}, ${stepResponseJson})
`;
}
}
68 changes: 68 additions & 0 deletions packages/bun-postgres-runtime/src/dao/task-queue-dao.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { SQL } from "bun";
import type { WorkflowEvent } from "@yieldstar/core";

interface TaskRow {
task_id: number;
workflow_id: string;
execution_id: string;
params?: string;
context?: string;
visible_from: number;
}

interface CountRow { count: number }

export class TaskQueueDao {
private ready: Promise<void>;
constructor(private sql: SQL) {
this.ready = this.setupDb();
}

private async setupDb() {
await this.sql`
CREATE TABLE IF NOT EXISTS task_queue (
task_id SERIAL PRIMARY KEY,
workflow_id TEXT NOT NULL,
execution_id TEXT NOT NULL,
params TEXT,
context TEXT,
visible_from BIGINT DEFAULT (EXTRACT(EPOCH FROM NOW()) * 1000)
)
`;
}

async insertTask(event: WorkflowEvent) {
await this.ready;
await this.sql`
INSERT INTO task_queue (workflow_id, execution_id, params, context, visible_from)
VALUES (${event.workflowId}, ${event.executionId}, ${event.params ? JSON.stringify(event.params) : null}, ${event.context ? JSON.stringify(Array.from(event.context.entries())) : null}, ${Date.now()})
`;
}

async getNextTask() {
await this.ready;
const now = Date.now();
const rows = await this.sql`
SELECT * FROM task_queue WHERE visible_from <= ${now} ORDER BY task_id LIMIT 1
` as TaskRow[];
return rows[0];
}

async updateTaskVisibility(taskId: number, visibleFrom: number) {
await this.ready;
await this.sql`
UPDATE task_queue SET visible_from = ${visibleFrom} WHERE task_id = ${taskId}
`;
}

async deleteTaskById(id: number) {
await this.ready;
await this.sql`DELETE FROM task_queue WHERE task_id = ${id}`;
}

async getTaskCount() {
await this.ready;
const rows = await this.sql`SELECT COUNT(*)::int as count FROM task_queue WHERE visible_from <= ${Date.now()}` as CountRow[];
return Number(rows[0]?.count ?? 0);
}
}
67 changes: 67 additions & 0 deletions packages/bun-postgres-runtime/src/dao/timers-dao.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { SQL } from "bun";
import type { WorkflowEvent } from "@yieldstar/core";

interface TimerRow {
id: number;
delay: number;
workflow_id: string;
execution_id: string;
created_at: number;
params?: string;
context?: string;
}

interface CountRow { count: number }

export class TimersDao {
private ready: Promise<void>;
constructor(private sql: SQL) {
this.ready = this.setupDb();
}

private async setupDb() {
await this.sql`
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id SERIAL PRIMARY KEY,
delay INTEGER NOT NULL,
workflow_id TEXT NOT NULL,
execution_id TEXT NOT NULL,
created_at BIGINT NOT NULL,
params TEXT,
context TEXT
)
`;
}

async getExpiredTimers(): Promise<TimerRow[]> {
await this.ready;
const now = Date.now();
return this.sql`
SELECT id, delay, workflow_id, execution_id, created_at, params, context
FROM scheduled_tasks
WHERE (created_at + delay) < ${now}
` as any as TimerRow[];
}

async getTaskCount() {
await this.ready;
const rows = (await this.sql`
SELECT COUNT(*)::int as count FROM scheduled_tasks
WHERE (created_at + delay) < ${Date.now()}
`) as CountRow[];
return Number(rows[0]?.count ?? 0);
}

async insertTimer(event: WorkflowEvent, delay: number) {
await this.ready;
await this.sql`
INSERT INTO scheduled_tasks (delay, workflow_id, execution_id, created_at, params, context)
VALUES (${delay}, ${event.workflowId}, ${event.executionId}, ${Date.now()}, ${event.params ? JSON.stringify(event.params) : null}, ${JSON.stringify(Array.from(event.context.entries()))})
`;
}

async deleteTimerById(id: number) {
await this.ready;
await this.sql`DELETE FROM scheduled_tasks WHERE id = ${id}`;
}
}
5 changes: 5 additions & 0 deletions packages/bun-postgres-runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export { PostgresHeapClient } from "./postgres-heap";
export { PostgresSchedulerClient } from "./postgres-scheduler";
export { PostgresEventLoop } from "./postgres-event-loop";
export { PostgresTaskQueueClient } from "./postgres-task-queue";
export { PostgresTimersClient } from "./postgres-timers";
41 changes: 41 additions & 0 deletions packages/bun-postgres-runtime/src/postgres-event-loop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import type { SQL } from "bun";
import { PostgresTaskQueue } from "./postgres-task-queue";
import { PostgresTimers } from "./postgres-timers";
import type { EventProcessor } from "@yieldstar/core";
import type { Logger } from "pino";

export class PostgresEventLoop {
taskQueue: PostgresTaskQueue;
timers: PostgresTimers;
private isRunning: boolean = false;

constructor(sql: SQL) {
this.taskQueue = new PostgresTaskQueue(sql);
this.timers = new PostgresTimers({ sql, taskQueue: this.taskQueue });
}

start(params: { onNewEvent: EventProcessor; logger: Logger }) {
this.isRunning = true;
this.loop(params.onNewEvent, params.logger);
}

stop() {
this.isRunning = false;
}

private async loop(processEvent: EventProcessor, logger: Logger) {
if (!this.isRunning) return;

while (!(await this.taskQueue.isEmpty())) {
const task = await this.taskQueue.process();
if (task) {
await processEvent(task.event, logger);
await this.taskQueue.remove(task.taskId);
}
}

await this.timers.processTimers();

setTimeout(() => this.loop(processEvent, logger), 10);
}
}
54 changes: 54 additions & 0 deletions packages/bun-postgres-runtime/src/postgres-heap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { HeapClient } from "@yieldstar/core";
import type { SQL } from "bun";
import { StepResponsesDao } from "./dao/step-response-dao";

export class PostgresHeapClient implements HeapClient {
private stepResponsesDao: StepResponsesDao;

constructor(sql: SQL) {
this.stepResponsesDao = new StepResponsesDao(sql);
}

async getAllSteps() {
return this.stepResponsesDao.getAllSteps();
}

async deleteAll() {
await this.stepResponsesDao.deleteAll();
}

async readStep(params: { executionId: string; stepKey: string }) {
const result = await this.stepResponsesDao.getLatestStepResponse(
params.executionId,
params.stepKey
);

if (!result?.step_response) {
return null;
}

return {
stepResponseJson: result.step_response,
meta: {
attempt: result.step_attempt,
done: Boolean(result.step_done),
},
};
}

async writeStep(params: {
executionId: string;
stepKey: string;
stepAttempt: number;
stepDone: boolean;
stepResponseJson: string;
}) {
await this.stepResponsesDao.insertStepResponse(
params.executionId,
params.stepKey,
params.stepAttempt,
params.stepDone,
params.stepResponseJson
);
}
}
Loading