Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"explain-error": "^1.0.4",
"express": "^5.0.0",
"express-session": "^1.18.1",
"generic-pool": "^3.9.0",
"has": "^1.0.3",
"helmet": "^8.0.0",
"htmlescape": "^1.1.1",
Expand Down
5 changes: 3 additions & 2 deletions src/Uwave.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { promisify } from 'node:util';
import Redis from 'ioredis';
import avvio from 'avvio';
import { pino } from 'pino';
import { CamelCasePlugin, Kysely, SqliteDialect } from 'kysely';
import { CamelCasePlugin, Kysely } from 'kysely';
import httpApi, { errorHandling } from './HttpApi.js';
import SocketServer from './SocketServer.js';
import { Source } from './Source.js';
Expand All @@ -20,7 +20,7 @@ import acl from './plugins/acl.js';
import waitlist from './plugins/waitlist.js';
import passport from './plugins/passport.js';
import migrations from './plugins/migrations.js';
import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js';
import { SqliteDateColumnsPlugin, SqliteDialect, connect as connectSqlite } from './utils/sqlite.js';

const DEFAULT_SQLITE_PATH = './uwave.sqlite';
const DEFAULT_REDIS_URL = 'redis://localhost:6379';
Expand Down Expand Up @@ -154,6 +154,7 @@ class UwaveServer extends EventEmitter {
this.db = new Kysely({
dialect: new SqliteDialect({
database: () => connectSqlite(options.sqlite ?? DEFAULT_SQLITE_PATH),
logger: this.logger.child({ ns: 'uwave:sqlite' }),
}),
// dialect: new PostgresDialect({
// pool: new pg.Pool({
Expand Down
7 changes: 6 additions & 1 deletion src/plugins/booth.js
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,12 @@ class Booth {
const result = this.#locker.using(
[REDIS_ADVANCING],
10_000,
(signal) => this.#advanceLocked({ ...opts, signal }),
(signal) => {
return this.#uw.db.transaction()
.execute(async (tx) => {
return this.#advanceLocked({ ...opts, signal }, tx);
});
},
);
this.#awaitAdvance = result;
return result;
Expand Down
187 changes: 187 additions & 0 deletions src/utils/sqlite.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import lodash from 'lodash';
import * as ky from 'kysely';
import { createPool } from 'generic-pool';
import { sql, OperationNodeTransformer } from 'kysely';

/**
Expand Down Expand Up @@ -236,3 +238,188 @@ export async function connect(path) {
export function isForeignKeyError(err) {
return err instanceof Error && 'code' in err && err.code === 'SQLITE_CONSTRAINT_FOREIGNKEY';
}

/**
* @typedef {{
* database: () => Promise<import('better-sqlite3').Database>,
* logger?: import('pino').Logger,
* }} SqliteDialectConfig */

/** @implements {ky.Dialect} */
export class SqliteDialect {
#config;

/** @param {SqliteDialectConfig} config */
constructor(config) {
this.#config = config;
}

createAdapter() {
return new ky.SqliteAdapter();
}

createQueryCompiler() {
return new ky.SqliteQueryCompiler();
}

/** @param {ky.Kysely<unknown>} db */
createIntrospector(db) {
return new ky.SqliteIntrospector(db);
}

createDriver() {
return new SqliteDriver(this.#config);
}
}
/** @implements {ky.Driver} */
export class SqliteDriver {
#config;

#pool;

/** @param {SqliteDialectConfig} config */
constructor(config) {
this.#config = config;
this.#pool = createPool({
create: async () => new SqliteConnection(await this.#config.database()),
destroy: async (db) => {
db.release();
},
}, {
min: 1,
max: 8,
});
}

async init() {
await this.#pool.ready();
}

async acquireConnection() {
this.#config.logger?.debug({
size: this.#pool.size,
available: this.#pool.available,
borrowed: this.#pool.borrowed,
}, 'acquire connection');

const connection = await this.#pool.acquire();
return connection;
}

/** @param {SqliteConnection} connection */
async releaseConnection(connection) {
this.#config.logger?.debug({
size: this.#pool.size,
available: this.#pool.available,
borrowed: this.#pool.borrowed,
}, 'release connection');

await this.#pool.release(connection);
}

/** @param {SqliteConnection} connection */
async beginTransaction(connection) {
connection.beginTransaction();
}

/** @param {SqliteConnection} connection */
async commitTransaction(connection) {
connection.commitTransaction();
}

/** @param {SqliteConnection} connection */
async rollbackTransaction(connection) {
connection.rollbackTransaction();
}

async destroy() {
await this.#pool.drain();
await this.#pool.clear();
}
}

/** @implements {ky.DatabaseConnection} */
class SqliteConnection {
#db;

/** @type {null | { stmt: import('better-sqlite3').Statement, sql: string }} */
#cache = null;

/** @param {import('better-sqlite3').Database} db */
constructor(db) {
this.#db = db;
}

release() {
this.#db.close();
}

beginTransaction() {
this.#db.exec('BEGIN IMMEDIATE');
}

commitTransaction() {
this.#db.exec('COMMIT');
}

rollbackTransaction() {
this.#db.exec('ROLLBACK');
}

/** @param {string} sql */
#prepare(sql) {
if (this.#cache != null && this.#cache.sql === sql) {
return this.#cache.stmt;
}
const stmt = this.#db.prepare(sql);
this.#cache = { sql, stmt };
return stmt;
}

/**
* @template O
* @param {ky.CompiledQuery<unknown>} compiledQuery
* @returns {Promise<ky.QueryResult<O>>}
*/
async executeQuery(compiledQuery) {
const stmt = this.#prepare(compiledQuery.sql);

if (stmt.reader) {
return {
rows: /** @type {O[]} */ (stmt.all(compiledQuery.parameters)),
};
}

const { changes, lastInsertRowid } = stmt.run(compiledQuery.parameters);
return {
numAffectedRows: changes != null ? BigInt(changes) : undefined,
insertId: lastInsertRowid != null ? BigInt(lastInsertRowid) : undefined,
rows: [],
};
}

/**
* @template O
* @param {ky.CompiledQuery<unknown>} compiledQuery
* @param {number} chunkSize
* @returns {AsyncIterableIterator<ky.QueryResult<O>>}
*/
async* streamQuery(compiledQuery, chunkSize) {
const stmt = this.#prepare(compiledQuery.sql);
if (!stmt.reader) {
throw new Error('only SELECT queries can be streamed');
}

let chunk = [];
for (const row of stmt.iterate(compiledQuery.parameters)) {
chunk.push(/** @type {O} */ (row));
if (chunk.length >= chunkSize) {
yield { rows: chunk };
chunk = [];
}
}
if (chunk.length > 0) {
yield { rows: chunk };
}
}
}
10 changes: 7 additions & 3 deletions test/utils/createUwave.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import 'dotenv/config';
import { once } from 'events';
import { spawn } from 'child_process';
import { once } from 'node:events';
import { spawn } from 'node:child_process';
import { unlink } from 'node:fs/promises';
import getPort from 'get-port';
import Redis from 'ioredis';
import randomString from 'random-string';
import uwave from 'u-wave-core';
import testPlugin from './plugin.mjs';

Expand Down Expand Up @@ -61,14 +63,15 @@ async function createUwave(name, options) {
const redisServer = process.env.REDIS_URL
? createRedisConnection()
: await createIsolatedRedis();
const sqlitePath = `testdb_${randomString()}.sqlite`;

const port = await getPort();

const uw = uwave({
...options,
port,
redis: redisServer.url,
sqlite: ':memory:',
sqlite: sqlitePath,
secret: Buffer.from(`secret_${name}`),
logger: {
level: 'error',
Expand All @@ -81,6 +84,7 @@ async function createUwave(name, options) {
try {
await uw.close();
} finally {
await unlink(sqlitePath);
await redisServer.close();
}
};
Expand Down
Loading