diff --git a/package.json b/package.json index 5f95333f..b50d6e57 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "explain-error": "^1.0.4", "express": "^5.0.0", "express-session": "^1.18.1", + "generic-pool": "^3.9.0", "has": "^1.0.3", "helmet": "^8.0.0", "htmlescape": "^1.1.1", diff --git a/src/Uwave.js b/src/Uwave.js index d09a23de..33a6edf4 100644 --- a/src/Uwave.js +++ b/src/Uwave.js @@ -3,7 +3,7 @@ import { promisify } from 'node:util'; import Redis from 'ioredis'; import avvio from 'avvio'; import { pino } from 'pino'; -import { CamelCasePlugin, Kysely, SqliteDialect } from 'kysely'; +import { CamelCasePlugin, Kysely } from 'kysely'; import httpApi, { errorHandling } from './HttpApi.js'; import SocketServer from './SocketServer.js'; import { Source } from './Source.js'; @@ -20,7 +20,7 @@ import acl from './plugins/acl.js'; import waitlist from './plugins/waitlist.js'; import passport from './plugins/passport.js'; import migrations from './plugins/migrations.js'; -import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js'; +import { SqliteDateColumnsPlugin, SqliteDialect, connect as connectSqlite } from './utils/sqlite.js'; const DEFAULT_SQLITE_PATH = './uwave.sqlite'; const DEFAULT_REDIS_URL = 'redis://localhost:6379'; @@ -154,6 +154,7 @@ class UwaveServer extends EventEmitter { this.db = new Kysely({ dialect: new SqliteDialect({ database: () => connectSqlite(options.sqlite ?? DEFAULT_SQLITE_PATH), + logger: this.logger.child({ ns: 'uwave:sqlite' }), }), // dialect: new PostgresDialect({ // pool: new pg.Pool({ diff --git a/src/plugins/booth.js b/src/plugins/booth.js index 004e2919..83c076b9 100644 --- a/src/plugins/booth.js +++ b/src/plugins/booth.js @@ -493,7 +493,12 @@ class Booth { const result = this.#locker.using( [REDIS_ADVANCING], 10_000, - (signal) => this.#advanceLocked({ ...opts, signal }), + (signal) => { + return this.#uw.db.transaction() + .execute(async (tx) => { + return this.#advanceLocked({ ...opts, signal }, tx); + }); + }, ); this.#awaitAdvance = result; return result; diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index 40d89e8e..42e0d8b9 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -1,4 +1,6 @@ import lodash from 'lodash'; +import * as ky from 'kysely'; +import { createPool } from 'generic-pool'; import { sql, OperationNodeTransformer } from 'kysely'; /** @@ -236,3 +238,188 @@ export async function connect(path) { export function isForeignKeyError(err) { return err instanceof Error && 'code' in err && err.code === 'SQLITE_CONSTRAINT_FOREIGNKEY'; } + +/** + * @typedef {{ + * database: () => Promise, + * logger?: import('pino').Logger, + * }} SqliteDialectConfig */ + +/** @implements {ky.Dialect} */ +export class SqliteDialect { + #config; + + /** @param {SqliteDialectConfig} config */ + constructor(config) { + this.#config = config; + } + + createAdapter() { + return new ky.SqliteAdapter(); + } + + createQueryCompiler() { + return new ky.SqliteQueryCompiler(); + } + + /** @param {ky.Kysely} db */ + createIntrospector(db) { + return new ky.SqliteIntrospector(db); + } + + createDriver() { + return new SqliteDriver(this.#config); + } +} +/** @implements {ky.Driver} */ +export class SqliteDriver { + #config; + + #pool; + + /** @param {SqliteDialectConfig} config */ + constructor(config) { + this.#config = config; + this.#pool = createPool({ + create: async () => new SqliteConnection(await this.#config.database()), + destroy: async (db) => { + db.release(); + }, + }, { + min: 1, + max: 8, + }); + } + + async init() { + await this.#pool.ready(); + } + + async acquireConnection() { + this.#config.logger?.debug({ + size: this.#pool.size, + available: this.#pool.available, + borrowed: this.#pool.borrowed, + }, 'acquire connection'); + + const connection = await this.#pool.acquire(); + return connection; + } + + /** @param {SqliteConnection} connection */ + async releaseConnection(connection) { + this.#config.logger?.debug({ + size: this.#pool.size, + available: this.#pool.available, + borrowed: this.#pool.borrowed, + }, 'release connection'); + + await this.#pool.release(connection); + } + + /** @param {SqliteConnection} connection */ + async beginTransaction(connection) { + connection.beginTransaction(); + } + + /** @param {SqliteConnection} connection */ + async commitTransaction(connection) { + connection.commitTransaction(); + } + + /** @param {SqliteConnection} connection */ + async rollbackTransaction(connection) { + connection.rollbackTransaction(); + } + + async destroy() { + await this.#pool.drain(); + await this.#pool.clear(); + } +} + +/** @implements {ky.DatabaseConnection} */ +class SqliteConnection { + #db; + + /** @type {null | { stmt: import('better-sqlite3').Statement, sql: string }} */ + #cache = null; + + /** @param {import('better-sqlite3').Database} db */ + constructor(db) { + this.#db = db; + } + + release() { + this.#db.close(); + } + + beginTransaction() { + this.#db.exec('BEGIN IMMEDIATE'); + } + + commitTransaction() { + this.#db.exec('COMMIT'); + } + + rollbackTransaction() { + this.#db.exec('ROLLBACK'); + } + + /** @param {string} sql */ + #prepare(sql) { + if (this.#cache != null && this.#cache.sql === sql) { + return this.#cache.stmt; + } + const stmt = this.#db.prepare(sql); + this.#cache = { sql, stmt }; + return stmt; + } + + /** + * @template O + * @param {ky.CompiledQuery} compiledQuery + * @returns {Promise>} + */ + async executeQuery(compiledQuery) { + const stmt = this.#prepare(compiledQuery.sql); + + if (stmt.reader) { + return { + rows: /** @type {O[]} */ (stmt.all(compiledQuery.parameters)), + }; + } + + const { changes, lastInsertRowid } = stmt.run(compiledQuery.parameters); + return { + numAffectedRows: changes != null ? BigInt(changes) : undefined, + insertId: lastInsertRowid != null ? BigInt(lastInsertRowid) : undefined, + rows: [], + }; + } + + /** + * @template O + * @param {ky.CompiledQuery} compiledQuery + * @param {number} chunkSize + * @returns {AsyncIterableIterator>} + */ + async* streamQuery(compiledQuery, chunkSize) { + const stmt = this.#prepare(compiledQuery.sql); + if (!stmt.reader) { + throw new Error('only SELECT queries can be streamed'); + } + + let chunk = []; + for (const row of stmt.iterate(compiledQuery.parameters)) { + chunk.push(/** @type {O} */ (row)); + if (chunk.length >= chunkSize) { + yield { rows: chunk }; + chunk = []; + } + } + if (chunk.length > 0) { + yield { rows: chunk }; + } + } +} diff --git a/test/utils/createUwave.mjs b/test/utils/createUwave.mjs index 2df6839f..348b3749 100644 --- a/test/utils/createUwave.mjs +++ b/test/utils/createUwave.mjs @@ -1,8 +1,10 @@ import 'dotenv/config'; -import { once } from 'events'; -import { spawn } from 'child_process'; +import { once } from 'node:events'; +import { spawn } from 'node:child_process'; +import { unlink } from 'node:fs/promises'; import getPort from 'get-port'; import Redis from 'ioredis'; +import randomString from 'random-string'; import uwave from 'u-wave-core'; import testPlugin from './plugin.mjs'; @@ -61,6 +63,7 @@ async function createUwave(name, options) { const redisServer = process.env.REDIS_URL ? createRedisConnection() : await createIsolatedRedis(); + const sqlitePath = `testdb_${randomString()}.sqlite`; const port = await getPort(); @@ -68,7 +71,7 @@ async function createUwave(name, options) { ...options, port, redis: redisServer.url, - sqlite: ':memory:', + sqlite: sqlitePath, secret: Buffer.from(`secret_${name}`), logger: { level: 'error', @@ -81,6 +84,7 @@ async function createUwave(name, options) { try { await uw.close(); } finally { + await unlink(sqlitePath); await redisServer.close(); } };