Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 55 additions & 56 deletions src/auto-mirror.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Env } from './worker';
import { convertSqlitePlaceholdersToPostgres, normalizeParams } from './sql-utils';

export class AutoMirrorDB {
constructor(private env: Env) { }
Expand All @@ -7,66 +8,70 @@ export class AutoMirrorDB {
const originalPrepare = this.env.DB.prepare.bind(this.env.DB);

this.env.DB.prepare = (sql: string) => {
const stmt = originalPrepare(sql);
const originalBind = stmt.bind.bind(stmt);

stmt.bind = (...params: unknown[]) => {
const bound = originalBind(...params);

// Patch all methods that can execute write operations
const originalRun = bound.run.bind(bound);
const originalAll = bound.all.bind(bound);
const originalFirst = bound.first.bind(bound);

// Only mirror if this is a write operation (INSERT, UPDATE, DELETE)
const isWriteOperation = this.isWriteSQL(sql);

bound.run = async <T = Record<string, unknown>>(): Promise<D1Result<T>> => {
const result = await originalRun<T>();
if (isWriteOperation) {
await this.mirrorToPostgres(sql, params);
}
return result;
};

bound.all = async <T = Record<string, unknown>>(): Promise<D1Result<T>> => {
const result = await originalAll<T>();
if (isWriteOperation) {
await this.mirrorToPostgres(sql, params);
}
return result;
};

bound.first = async <T = Record<string, unknown>>(colName?: string): Promise<T | null> => {
const result = await (colName ? originalFirst<T>(colName) : originalFirst<T>());
if (isWriteOperation) {
await this.mirrorToPostgres(sql, params);
}
return result;
};

return bound;
const statement = originalPrepare(sql);
const originalBind = statement.bind.bind(statement);
const patchedStatement = this.patchStatement(statement, sql, []);

patchedStatement.bind = (...args: unknown[]) => {
const normalizedParams = normalizeParams(args);
const boundStatement = originalBind(...args as []);
return this.patchStatement(boundStatement, sql, normalizedParams);
};

return stmt;
return patchedStatement;
};
}

private patchStatement(statement: D1PreparedStatement, sql: string, params: unknown[]) {
const boundParams = [...params];
const isWriteOperation = this.isWriteSQL(sql);

const originalRun = statement.run.bind(statement);
const originalAll = statement.all.bind(statement);
const originalFirst = statement.first.bind(statement);

statement.run = async <T = Record<string, unknown>>(): Promise<D1Result<T>> => {
const result = await originalRun<T>();
if (isWriteOperation) {
await this.mirrorToPostgres(sql, boundParams);
}
return result;
};

statement.all = async <T = Record<string, unknown>>(): Promise<D1Result<T>> => {
const result = await originalAll<T>();
if (isWriteOperation) {
await this.mirrorToPostgres(sql, boundParams);
}
return result;
};

statement.first = async <T = Record<string, unknown>>(colName?: string): Promise<T | null> => {
const result = await (colName ? originalFirst<T>(colName) : originalFirst<T>());
if (isWriteOperation) {
await this.mirrorToPostgres(sql, boundParams);
}
return result;
};

return statement;
}

private isWriteSQL(sql: string): boolean {
// Remove comments and normalize whitespace
const cleanSql = sql
.replace(/--.*$/gm, '') // Remove line comments
.replace(/\/\*[\s\S]*?\*\//g, '') // Remove block comments
.replace(/\s+/g, ' ') // Normalize whitespace
.replace(/--.*$/gm, '')
.replace(/\/\*[\s\S]*?\*\//g, '')
.replace(/\s+/g, ' ')
.trim()
.toLowerCase();

if (!cleanSql) return false;

// Split by semicolons to handle multi-statement SQL
const statements = cleanSql.split(';').map(s => s.trim()).filter(s => s.length > 0);
const statements = cleanSql
.split(';')
.map(s => s.trim())
.filter(s => s.length > 0);

// Check if any statement is a write operation
return statements.some(statement => {
const firstWord = statement.split(/\s+/)[0];
return ['insert', 'update', 'delete', 'replace', 'create', 'drop', 'alter'].includes(firstWord);
Expand All @@ -75,22 +80,16 @@ export class AutoMirrorDB {

private async mirrorToPostgres(sql: string, params: unknown[]) {
try {
const pgSql = this.convertPlaceholders(sql, params.length);
const pgSql = convertSqlitePlaceholdersToPostgres(sql);
const opId = crypto.randomUUID();

await this.env.MIRROR_QUEUE.send({
sql: pgSql,
params,
params: [...params],
opId
});
} catch (error) {
console.error('Failed to queue mirror operation:', error);
// Don't throw - we don't want mirroring failures to break the main operation
}
}

private convertPlaceholders(sql: string, count: number): string {
let i = 1;
return sql.replace(/\?/g, () => `$${i++}`);
}
}
}
20 changes: 8 additions & 12 deletions src/db-router.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Env } from './worker';
import { TableColumn, convertSqlitePlaceholdersToPostgres, quoteIdentifier } from './sql-utils';

export async function executeQuery(env: Env, sql: string, params: unknown[] = []) {
if (env.PRIMARY_DB === "pg") {
Expand All @@ -16,7 +17,7 @@ export async function executeQuery(env: Env, sql: string, params: unknown[] = []
await client.connect();

// Convert D1 placeholders to Postgres placeholders
const pgSql = convertPlaceholders(sql, params.length);
const pgSql = convertSqlitePlaceholdersToPostgres(sql);
const result = await client.query(pgSql, params);

// Return in D1-compatible format
Expand Down Expand Up @@ -71,8 +72,8 @@ export async function executeQuery(env: Env, sql: string, params: unknown[] = []
}
}

export async function getTableInfo(env: Env, tableName: string) {
const sql = `PRAGMA table_info(${tableName})`;
export async function getTableInfo(env: Env, tableName: string): Promise<TableColumn[]> {
const sql = `PRAGMA table_info(${quoteIdentifier(tableName)})`;

if (env.PRIMARY_DB === "pg") {
// Get table info from Postgres
Expand Down Expand Up @@ -107,7 +108,7 @@ export async function getTableInfo(env: Env, tableName: string) {
ORDER BY ordinal_position
`, [tableName]);

return result.rows;
return result.rows as TableColumn[];
} catch (error) {
console.error('Failed to get table info from Postgres:', error);
throw new Error('Failed to get table schema');
Expand All @@ -123,7 +124,7 @@ export async function getTableInfo(env: Env, tableName: string) {
try {
const stmt = env.DB.prepare(sql);
const result = await stmt.all();
return result.results;
return result.results as unknown as TableColumn[];
} catch (error) {
console.error('Failed to get table info from D1:', error);
throw new Error('Failed to get table schema');
Expand Down Expand Up @@ -169,7 +170,7 @@ export async function getAllTables(env: Env): Promise<string[]> {
// Get tables from D1
try {
const stmt = env.DB.prepare(`
SELECT name FROM sqlite_master
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
`);
Expand All @@ -180,9 +181,4 @@ export async function getAllTables(env: Env): Promise<string[]> {
throw new Error('Failed to get table list');
}
}
}

function convertPlaceholders(sql: string, count: number): string {
let i = 1;
return sql.replace(/\?/g, () => `$${i++}`);
}
}
49 changes: 29 additions & 20 deletions src/export-helper.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
import { Env } from './worker';
import { TableColumn, deriveOrderingColumns, formatPostgresValue, quoteIdentifier } from './sql-utils';

interface ExportOptions {
batchSize?: number;
tableName: string;
orderBy?: string;
orderBy?: string | string[];
whereClause?: string;
schema?: TableColumn[];
}

/**
* Stream export D1 data in batches to avoid memory issues
*/
export async function* streamTableData(env: Env, options: ExportOptions) {
const { tableName, batchSize = 1000, orderBy = 'id', whereClause } = options;
const { tableName, batchSize = 1000, orderBy, whereClause, schema } = options;

const orderingColumns = Array.isArray(orderBy)
? orderBy
: orderBy
? [orderBy]
: deriveOrderingColumns(schema);

const tableIdentifier = quoteIdentifier(tableName);
const resolvedOrdering = orderingColumns.length > 0 ? orderingColumns : ['rowid'];
const orderByClause = resolvedOrdering
.map(column => column.toLowerCase() === 'rowid' ? 'rowid' : quoteIdentifier(column))
.join(', ');

let offset = 0;
let hasMore = true;

while (hasMore) {
const whereSQL = whereClause ? `WHERE ${whereClause}` : '';
const sql = `
SELECT * FROM ${tableName}
SELECT * FROM ${tableIdentifier}
${whereSQL}
ORDER BY ${orderBy}
LIMIT ${batchSize}
ORDER BY ${orderByClause}
LIMIT ${batchSize}
OFFSET ${offset}
`;

Expand Down Expand Up @@ -50,10 +64,10 @@ export async function* streamTableData(env: Env, options: ExportOptions) {
/**
* Get table schema information from D1
*/
export async function getTableSchema(env: Env, tableName: string) {
const stmt = env.DB.prepare(`PRAGMA table_info(${tableName})`);
export async function getTableSchema(env: Env, tableName: string): Promise<TableColumn[]> {
const stmt = env.DB.prepare(`PRAGMA table_info(${quoteIdentifier(tableName)})`);
const result = await stmt.all();
return result.results;
return result.results as unknown as TableColumn[];
}

/**
Expand All @@ -75,22 +89,17 @@ export async function getAllTables(env: Env): Promise<string[]> {
export function generatePostgresInserts(
tableName: string,
rows: any[],
schema: any[]
schema: TableColumn[]
): string[] {
if (rows.length === 0) return [];

const columns = schema.map(col => col.name);
const columnList = columns.join(', ');
const quotedTableName = quoteIdentifier(tableName);
const columnList = columns.map(quoteIdentifier).join(', ');

return rows.map(row => {
const values = columns.map(col => {
const value = row[col];
if (value === null) return 'NULL';
if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'`;
if (typeof value === 'boolean') return value ? 'TRUE' : 'FALSE';
return value;
}).join(', ');

return `INSERT INTO ${tableName} (${columnList}) VALUES (${values}) ON CONFLICT DO NOTHING;`;
const values = columns.map(col => formatPostgresValue(row[col] ?? null)).join(', ');

return `INSERT INTO ${quotedTableName} (${columnList}) VALUES (${values}) ON CONFLICT DO NOTHING;`;
});
}
}
4 changes: 2 additions & 2 deletions src/export-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async function streamSqlExport(env: Env, tableName: string, batchSize: number, s
controller.enqueue(encoder.encode(header));

try {
for await (const batch of streamTableData(env, { tableName, batchSize })) {
for await (const batch of streamTableData(env, { tableName, batchSize, schema })) {
const inserts = generatePostgresInserts(tableName, batch.rows, schema);
const sql = inserts.join('\n') + '\n\n';
controller.enqueue(encoder.encode(sql));
Expand Down Expand Up @@ -87,7 +87,7 @@ async function streamJsonExport(env: Env, tableName: string, batchSize: number,
controller.enqueue(encoder.encode(',\n "data": [\n'));

try {
for await (const batch of streamTableData(env, { tableName, batchSize })) {
for await (const batch of streamTableData(env, { tableName, batchSize, schema })) {
for (const row of batch.rows) {
if (!isFirst) {
controller.enqueue(encoder.encode(',\n'));
Expand Down
Loading