From 8608a4568046e217afaeb96a387ddccc6e0baa0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sat, 30 Nov 2024 19:46:44 +0100 Subject: [PATCH 1/8] Use a custom kysely dialect with multiple connections --- src/Uwave.js | 5 +- src/utils/sqlite.js | 144 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/Uwave.js b/src/Uwave.js index d09a23de..33a6edf4 100644 --- a/src/Uwave.js +++ b/src/Uwave.js @@ -3,7 +3,7 @@ import { promisify } from 'node:util'; import Redis from 'ioredis'; import avvio from 'avvio'; import { pino } from 'pino'; -import { CamelCasePlugin, Kysely, SqliteDialect } from 'kysely'; +import { CamelCasePlugin, Kysely } from 'kysely'; import httpApi, { errorHandling } from './HttpApi.js'; import SocketServer from './SocketServer.js'; import { Source } from './Source.js'; @@ -20,7 +20,7 @@ import acl from './plugins/acl.js'; import waitlist from './plugins/waitlist.js'; import passport from './plugins/passport.js'; import migrations from './plugins/migrations.js'; -import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js'; +import { SqliteDateColumnsPlugin, SqliteDialect, connect as connectSqlite } from './utils/sqlite.js'; const DEFAULT_SQLITE_PATH = './uwave.sqlite'; const DEFAULT_REDIS_URL = 'redis://localhost:6379'; @@ -154,6 +154,7 @@ class UwaveServer extends EventEmitter { this.db = new Kysely({ dialect: new SqliteDialect({ database: () => connectSqlite(options.sqlite ?? DEFAULT_SQLITE_PATH), + logger: this.logger.child({ ns: 'uwave:sqlite' }), }), // dialect: new PostgresDialect({ // pool: new pg.Pool({ diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index 40d89e8e..a0c6c176 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -1,4 +1,5 @@ import lodash from 'lodash'; +import * as ky from 'kysely'; import { sql, OperationNodeTransformer } from 'kysely'; /** @@ -236,3 +237,146 @@ export async function connect(path) { export function isForeignKeyError(err) { return err instanceof Error && 'code' in err && err.code === 'SQLITE_CONSTRAINT_FOREIGNKEY'; } + +/** + * @typedef {{ + * database: () => Promise, + * logger?: import('pino').Logger, + * }} SqliteDialectConfig */ + +/** @implements {ky.Dialect} */ +export class SqliteDialect { + #config; + + /** @param {SqliteDialectConfig} config */ + constructor(config) { + this.#config = config; + } + + createAdapter() { + return new ky.SqliteAdapter() + } + + createQueryCompiler() { + return new ky.SqliteQueryCompiler() + } + + /** @param {ky.Kysely} db */ + createIntrospector(db) { + return new ky.SqliteIntrospector(db) + } + + createDriver() { + return new SqliteDriver(this.#config); + } +} +/** @implements {ky.Driver} */ +export class SqliteDriver { + #config; + + #connections = 0; + + /** @param {SqliteDialectConfig} config */ + constructor(config) { + this.#config = config; + } + + async init() {} + + async acquireConnection () { + this.#connections += 1; + this.#config.logger?.debug({ active: this.#connections }, 'acquire connection'); + + const db = await this.#config.database(); + return new SqliteConnection(db); + } + + /** @param {SqliteConnection} db */ + async releaseConnection(db) { + this.#connections -= 1; + this.#config.logger?.debug({ active: this.#connections }, 'release connection'); + + db.release(); + } + + /** @param {SqliteConnection} db */ + async beginTransaction(db) { + db._beginTransaction(); + } + + /** @param {SqliteConnection} db */ + async commitTransaction(db) { + db._commitTransaction(); + } + + /** @param {SqliteConnection} db */ + async rollbackTransaction(db) { + db._rollbackTransaction(); + } + + async destroy() { + // Nothing to do :) + } +} + +/** @implements {ky.DatabaseConnection} */ +class SqliteConnection { + #db; + + /** @param {import('better-sqlite3').Database} db */ + constructor(db) { + this.#db = db; + } + + release() { + this.#db.close(); + } + + _beginTransaction() { + this.#db.exec('BEGIN IMMEDIATE'); + } + + _commitTransaction() { + this.#db.exec('COMMIT'); + } + + _rollbackTransaction() { + this.#db.exec('ROLLBACK'); + } + + /** + * @template O + * @param {ky.CompiledQuery} compiledQuery + * @returns {Promise>} + */ + async executeQuery(compiledQuery) { + const stmt = this.#db.prepare(compiledQuery.sql); + if (stmt.reader) { + return { + rows: /** @type {O[]} */ (stmt.all(compiledQuery.parameters)), + }; + } + + const { changes, lastInsertRowid } = stmt.run(compiledQuery.parameters); + return { + numUpdatedOrDeletedRows: changes != null ? BigInt(changes) : undefined, + numAffectedRows: changes != null ? BigInt(changes) : undefined, + insertId: lastInsertRowid != null ? BigInt(lastInsertRowid) : undefined, + rows: [], + }; + } + + /** + * @template O + * @param {ky.CompiledQuery} compiledQuery + * @param {number} chunkSize + * @returns {AsyncIterableIterator>} + */ + async* streamQuery(compiledQuery, chunkSize) { + const all = await this.executeQuery(compiledQuery); + void chunkSize; + for (const row of all.rows) { + yield { rows: [row] }; + } + } +} From 138a38bdcf866d506a56bc0ed38ce28c61f5e920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sat, 30 Nov 2024 19:58:07 +0100 Subject: [PATCH 2/8] Use a connection pool --- package.json | 1 + src/utils/sqlite.js | 64 +++++++++++++++++++++++++++++---------------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/package.json b/package.json index 5d9962b5..b4447ed9 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "explain-error": "^1.0.4", "express": "^5.0.0", "express-session": "^1.18.1", + "generic-pool": "^3.9.0", "has": "^1.0.3", "helmet": "^8.0.0", "htmlescape": "^1.1.1", diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index a0c6c176..65ff1f35 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -1,5 +1,6 @@ import lodash from 'lodash'; import * as ky from 'kysely'; +import { createPool } from 'generic-pool'; import { sql, OperationNodeTransformer } from 'kysely'; /** @@ -273,49 +274,66 @@ export class SqliteDialect { /** @implements {ky.Driver} */ export class SqliteDriver { #config; - - #connections = 0; + #pool; /** @param {SqliteDialectConfig} config */ constructor(config) { this.#config = config; + this.#pool = createPool({ + create: async () => new SqliteConnection(await this.#config.database()), + destroy: async (db) => { + db.release(); + }, + }, { + min: 1, + max: 8, + }); } - async init() {} + async init() { + await this.#pool.ready(); + } async acquireConnection () { - this.#connections += 1; - this.#config.logger?.debug({ active: this.#connections }, 'acquire connection'); - - const db = await this.#config.database(); - return new SqliteConnection(db); + this.#config.logger?.debug({ + size: this.#pool.size, + available: this.#pool.available, + borrowed: this.#pool.borrowed, + }, 'acquire connection'); + + const connection = await this.#pool.acquire(); + return connection; } - /** @param {SqliteConnection} db */ - async releaseConnection(db) { - this.#connections -= 1; - this.#config.logger?.debug({ active: this.#connections }, 'release connection'); + /** @param {SqliteConnection} connection */ + async releaseConnection(connection) { + this.#config.logger?.debug({ + size: this.#pool.size, + available: this.#pool.available, + borrowed: this.#pool.borrowed, + }, 'release connection'); - db.release(); + await this.#pool.release(connection); } - /** @param {SqliteConnection} db */ - async beginTransaction(db) { - db._beginTransaction(); + /** @param {SqliteConnection} connection */ + async beginTransaction(connection) { + connection._beginTransaction(); } - /** @param {SqliteConnection} db */ - async commitTransaction(db) { - db._commitTransaction(); + /** @param {SqliteConnection} connection */ + async commitTransaction(connection) { + connection._commitTransaction(); } - /** @param {SqliteConnection} db */ - async rollbackTransaction(db) { - db._rollbackTransaction(); + /** @param {SqliteConnection} connection */ + async rollbackTransaction(connection) { + connection._rollbackTransaction(); } async destroy() { - // Nothing to do :) + await this.#pool.drain(); + await this.#pool.clear(); } } From b17fd4d69e340f0de9db207ee9096594c5eb9a8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sat, 30 Nov 2024 20:07:01 +0100 Subject: [PATCH 3/8] Wrap booth advance in an async transaction --- src/plugins/booth.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/booth.js b/src/plugins/booth.js index 004e2919..6a86f550 100644 --- a/src/plugins/booth.js +++ b/src/plugins/booth.js @@ -493,7 +493,7 @@ class Booth { const result = this.#locker.using( [REDIS_ADVANCING], 10_000, - (signal) => this.#advanceLocked({ ...opts, signal }), + (signal) => this.#uw.db.transaction().execute(async (tx) => this.#advanceLocked({ ...opts, signal }, tx)), ); this.#awaitAdvance = result; return result; From e4c1a212d53747a5780c8c7e3f18a290fba1769b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sat, 30 Nov 2024 20:12:32 +0100 Subject: [PATCH 4/8] Reuse prepared statement for repeated queries --- src/utils/sqlite.js | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index 65ff1f35..f05ef712 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -340,6 +340,8 @@ export class SqliteDriver { /** @implements {ky.DatabaseConnection} */ class SqliteConnection { #db; + /** @type {null | { stmt: import('better-sqlite3').Statement, sql: string }} */ + #cache = null; /** @param {import('better-sqlite3').Database} db */ constructor(db) { @@ -362,13 +364,24 @@ class SqliteConnection { this.#db.exec('ROLLBACK'); } + /** @param {string} sql */ + #prepare(sql) { + if (this.#cache != null && this.#cache.sql === sql) { + return this.#cache.stmt; + } + const stmt = this.#db.prepare(sql); + this.#cache = { sql, stmt }; + return stmt; + } + /** * @template O * @param {ky.CompiledQuery} compiledQuery * @returns {Promise>} */ async executeQuery(compiledQuery) { - const stmt = this.#db.prepare(compiledQuery.sql); + const stmt = this.#prepare(compiledQuery.sql); + if (stmt.reader) { return { rows: /** @type {O[]} */ (stmt.all(compiledQuery.parameters)), @@ -392,9 +405,9 @@ class SqliteConnection { */ async* streamQuery(compiledQuery, chunkSize) { const all = await this.executeQuery(compiledQuery); - void chunkSize; for (const row of all.rows) { yield { rows: [row] }; } + void chunkSize; } } From d31913a152d9aa1fcbad852d8c310f4d2ab8e906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sun, 1 Dec 2024 09:34:52 +0100 Subject: [PATCH 5/8] chunk --- src/utils/sqlite.js | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index f05ef712..3f2cca23 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -404,10 +404,21 @@ class SqliteConnection { * @returns {AsyncIterableIterator>} */ async* streamQuery(compiledQuery, chunkSize) { - const all = await this.executeQuery(compiledQuery); - for (const row of all.rows) { - yield { rows: [row] }; + const stmt = this.#prepare(compiledQuery.sql); + if (!stmt.reader) { + throw new Error('only SELECT queries can be streamed'); + } + + let chunk = []; + for (const row of stmt.iterate(compiledQuery.parameters)) { + chunk.push(/** @type {O} */ (row)); + if (chunk.length >= chunkSize) { + yield { rows: chunk }; + chunk = []; + } + } + if (chunk.length > 0) { + yield { rows: chunk }; } - void chunkSize; } } From 33ddad83e7680d22291ec31d3c8cd0eaf453d671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Wed, 4 Dec 2024 19:25:00 +0100 Subject: [PATCH 6/8] eslint --fix --- src/utils/sqlite.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index 3f2cca23..550d6602 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -255,16 +255,16 @@ export class SqliteDialect { } createAdapter() { - return new ky.SqliteAdapter() + return new ky.SqliteAdapter(); } createQueryCompiler() { - return new ky.SqliteQueryCompiler() + return new ky.SqliteQueryCompiler(); } /** @param {ky.Kysely} db */ createIntrospector(db) { - return new ky.SqliteIntrospector(db) + return new ky.SqliteIntrospector(db); } createDriver() { @@ -274,6 +274,7 @@ export class SqliteDialect { /** @implements {ky.Driver} */ export class SqliteDriver { #config; + #pool; /** @param {SqliteDialectConfig} config */ @@ -294,7 +295,7 @@ export class SqliteDriver { await this.#pool.ready(); } - async acquireConnection () { + async acquireConnection() { this.#config.logger?.debug({ size: this.#pool.size, available: this.#pool.available, @@ -340,6 +341,7 @@ export class SqliteDriver { /** @implements {ky.DatabaseConnection} */ class SqliteConnection { #db; + /** @type {null | { stmt: import('better-sqlite3').Statement, sql: string }} */ #cache = null; From 83e73372d6eaea62a8166e484d3c4be1e3e444e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Thu, 17 Apr 2025 17:49:55 +0200 Subject: [PATCH 7/8] Use a temporary file for shared SQLite access --- test/utils/createUwave.mjs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/utils/createUwave.mjs b/test/utils/createUwave.mjs index 2df6839f..348b3749 100644 --- a/test/utils/createUwave.mjs +++ b/test/utils/createUwave.mjs @@ -1,8 +1,10 @@ import 'dotenv/config'; -import { once } from 'events'; -import { spawn } from 'child_process'; +import { once } from 'node:events'; +import { spawn } from 'node:child_process'; +import { unlink } from 'node:fs/promises'; import getPort from 'get-port'; import Redis from 'ioredis'; +import randomString from 'random-string'; import uwave from 'u-wave-core'; import testPlugin from './plugin.mjs'; @@ -61,6 +63,7 @@ async function createUwave(name, options) { const redisServer = process.env.REDIS_URL ? createRedisConnection() : await createIsolatedRedis(); + const sqlitePath = `testdb_${randomString()}.sqlite`; const port = await getPort(); @@ -68,7 +71,7 @@ async function createUwave(name, options) { ...options, port, redis: redisServer.url, - sqlite: ':memory:', + sqlite: sqlitePath, secret: Buffer.from(`secret_${name}`), logger: { level: 'error', @@ -81,6 +84,7 @@ async function createUwave(name, options) { try { await uw.close(); } finally { + await unlink(sqlitePath); await redisServer.close(); } }; From 7e635abc0a823894a1be0bb2608cc9cb6bebe120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Thu, 17 Apr 2025 17:51:54 +0200 Subject: [PATCH 8/8] lint --- src/plugins/booth.js | 7 ++++++- src/utils/sqlite.js | 13 ++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/plugins/booth.js b/src/plugins/booth.js index 6a86f550..83c076b9 100644 --- a/src/plugins/booth.js +++ b/src/plugins/booth.js @@ -493,7 +493,12 @@ class Booth { const result = this.#locker.using( [REDIS_ADVANCING], 10_000, - (signal) => this.#uw.db.transaction().execute(async (tx) => this.#advanceLocked({ ...opts, signal }, tx)), + (signal) => { + return this.#uw.db.transaction() + .execute(async (tx) => { + return this.#advanceLocked({ ...opts, signal }, tx); + }); + }, ); this.#awaitAdvance = result; return result; diff --git a/src/utils/sqlite.js b/src/utils/sqlite.js index 550d6602..42e0d8b9 100644 --- a/src/utils/sqlite.js +++ b/src/utils/sqlite.js @@ -319,17 +319,17 @@ export class SqliteDriver { /** @param {SqliteConnection} connection */ async beginTransaction(connection) { - connection._beginTransaction(); + connection.beginTransaction(); } /** @param {SqliteConnection} connection */ async commitTransaction(connection) { - connection._commitTransaction(); + connection.commitTransaction(); } /** @param {SqliteConnection} connection */ async rollbackTransaction(connection) { - connection._rollbackTransaction(); + connection.rollbackTransaction(); } async destroy() { @@ -354,15 +354,15 @@ class SqliteConnection { this.#db.close(); } - _beginTransaction() { + beginTransaction() { this.#db.exec('BEGIN IMMEDIATE'); } - _commitTransaction() { + commitTransaction() { this.#db.exec('COMMIT'); } - _rollbackTransaction() { + rollbackTransaction() { this.#db.exec('ROLLBACK'); } @@ -392,7 +392,6 @@ class SqliteConnection { const { changes, lastInsertRowid } = stmt.run(compiledQuery.parameters); return { - numUpdatedOrDeletedRows: changes != null ? BigInt(changes) : undefined, numAffectedRows: changes != null ? BigInt(changes) : undefined, insertId: lastInsertRowid != null ? BigInt(lastInsertRowid) : undefined, rows: [],