From 453363538f1d4082f3e13394f38a4c1438be1bcd Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 6 Nov 2025 01:00:32 +0100 Subject: [PATCH 1/5] Cache cl for pks in map Signed-off-by: Somtochi Onyekwere --- core/rs/bundle/Cargo.toml | 1 + core/rs/bundle/src/lib.rs | 11 ++ core/rs/bundle_static/Cargo.lock | 1 + core/rs/core/src/c.rs | 13 +- core/rs/core/src/changes_vtab.rs | 13 ++ core/rs/core/src/changes_vtab_write.rs | 44 ++++- core/rs/core/src/commit.rs | 14 +- core/rs/core/src/db_version.rs | 15 ++ core/rs/core/src/lib.rs | 71 +++++-- core/rs/core/src/local_writes/after_delete.rs | 31 ++-- core/rs/core/src/local_writes/after_insert.rs | 32 +++- core/rs/core/src/local_writes/after_update.rs | 99 +++++----- core/rs/core/src/local_writes/mod.rs | 40 ++++ core/rs/core/src/tableinfo.rs | 9 +- core/rs/integration_check/Cargo.lock | 1 + .../integration_check/src/t/pk_only_tables.rs | 3 +- core/rs/integration_check/src/t/tableinfo.rs | 8 +- .../src/t/test_db_version.rs | 174 +++++++++++++++++- core/src/ext-data.c | 4 + core/src/ext-data.h | 3 + 20 files changed, 485 insertions(+), 102 deletions(-) diff --git a/core/rs/bundle/Cargo.toml b/core/rs/bundle/Cargo.toml index a5bcc151a..f79cef3bd 100644 --- a/core/rs/bundle/Cargo.toml +++ b/core/rs/bundle/Cargo.toml @@ -16,6 +16,7 @@ crate-type = ["rlib"] crsql_fractindex_core = {path="../fractindex-core"} crsql_core = { path="../core" } sqlite_nostd = { path="../sqlite-rs-embedded/sqlite_nostd" } +libc-print = "*" [profile.dev] panic = "abort" diff --git a/core/rs/bundle/src/lib.rs b/core/rs/bundle/src/lib.rs index 4c4a35df4..da0000469 100644 --- a/core/rs/bundle/src/lib.rs +++ b/core/rs/bundle/src/lib.rs @@ -11,6 +11,8 @@ use crsql_core::sqlite3_crsqlcore_init; #[cfg(feature = "test")] pub use crsql_core::test_exports; use crsql_fractindex_core::sqlite3_crsqlfractionalindex_init; +#[cfg(feature = "test")] +use libc_print::std_name::println; use sqlite_nostd as sqlite; use sqlite_nostd::SQLite3Allocator; @@ -21,11 +23,20 @@ static ALLOCATOR: SQLite3Allocator = SQLite3Allocator {}; // This must be our panic handler for WASM builds. For simplicity, we make it our panic handler for // all builds. Abort is also more portable than unwind, enabling us to go to more embedded use cases. +#[cfg(not(feature = "test"))] #[panic_handler] fn panic(_info: &PanicInfo) -> ! { core::intrinsics::abort() } +// Print panic info for tests +#[cfg(feature = "test")] +#[panic_handler] +fn panic(info: &PanicInfo) -> ! { + println!("PANIC!: {}", info); + core::intrinsics::abort(); +} + #[cfg(not(target_family = "wasm"))] #[lang = "eh_personality"] extern "C" fn eh_personality() {} diff --git a/core/rs/bundle_static/Cargo.lock b/core/rs/bundle_static/Cargo.lock index 909df5a63..55b91c73b 100644 --- a/core/rs/bundle_static/Cargo.lock +++ b/core/rs/bundle_static/Cargo.lock @@ -93,6 +93,7 @@ version = "0.1.0" dependencies = [ "crsql_core", "crsql_fractindex_core", + "libc-print", "sqlite_nostd", ] diff --git a/core/rs/core/src/c.rs b/core/rs/core/src/c.rs index 3a1539d96..44f82926a 100644 --- a/core/rs/core/src/c.rs +++ b/core/rs/core/src/c.rs @@ -74,6 +74,7 @@ pub struct crsql_ExtData { pub mergeEqualValues: ::core::ffi::c_int, pub timestamp: ::core::ffi::c_ulonglong, pub ordinalMap: *mut ::core::ffi::c_void, + pub clCache: *mut ::core::ffi::c_void, } #[repr(C)] @@ -271,7 +272,7 @@ fn bindgen_test_layout_crsql_ExtData() { let ptr = UNINIT.as_ptr(); assert_eq!( ::core::mem::size_of::(), - 168usize, + 176usize, concat!("Size of: ", stringify!(crsql_ExtData)) ); assert_eq!( @@ -511,4 +512,14 @@ fn bindgen_test_layout_crsql_ExtData() { stringify!(ordinalMap) ) ); + assert_eq!( + unsafe { ::core::ptr::addr_of!((*ptr).clCache) as usize - ptr as usize }, + 168usize, + concat!( + "Offset of field: ", + stringify!(crsql_ExtData), + "::", + stringify!(clCache) + ) + ); } diff --git a/core/rs/core/src/changes_vtab.rs b/core/rs/core/src/changes_vtab.rs index e75b80ab6..176f9fa34 100644 --- a/core/rs/core/src/changes_vtab.rs +++ b/core/rs/core/src/changes_vtab.rs @@ -586,6 +586,19 @@ pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) - (*(*tab).pExtData).ordinalMap as *mut BTreeMap, i64>, )) }; + + let mut cl_cache = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*(*tab).pExtData).clCache as *mut BTreeMap>, + )) + }; + + for (_, map) in cl_cache.iter_mut() { + if !map.is_empty() { + map.clear(); + } + } + ordinals.clear(); ResultCode::OK as c_int } diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index 70d94d19c..f38daaf75 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -1,6 +1,8 @@ use alloc::boxed::Box; +use alloc::collections::BTreeMap; use alloc::ffi::CString; use alloc::format; +use alloc::string::String; use alloc::vec::Vec; use core::ffi::{c_char, c_int}; use core::mem; @@ -466,6 +468,11 @@ unsafe fn merge_insert( let tbl_infos = mem::ManuallyDrop::new(Box::from_raw( (*(*tab).pExtData).tableInfos as *mut Vec, )); + + let mut cl_cache = mem::ManuallyDrop::new(Box::from_raw( + (*(*tab).pExtData).clCache as *mut BTreeMap>, + )); + // TODO: will this work given `insert_tbl` is null termed? let tbl_info_index = tbl_infos.iter().position(|x| x.tbl_name == insert_tbl); @@ -487,7 +494,7 @@ unsafe fn merge_insert( // We'll need the key for all later operations. let key = tbl_info.get_or_create_key(db, &unpacked_pks)?; - let local_cl = get_local_cl(db, &tbl_info, key)?; + let local_cl = get_pk_cl(db, &mut cl_cache, &tbl_info, key)?; // We can ignore all updates from older causal lengths. // They won't win at anything. @@ -593,8 +600,6 @@ unsafe fn merge_insert( *rowid = slab_rowid(tbl_info_index as i32, inner_rowid); } return Ok(ResultCode::OK); - - } // we got a causal length which would resurrect the row. @@ -712,7 +717,40 @@ unsafe fn merge_insert( *errmsg = err.into_raw(); return Err(rc); } + + // a bigger cl always wins + if insert_cl > local_cl { + match cl_cache.get_mut(&tbl_info.tbl_name) { + Some(x) => { + x.insert(key, insert_cl); + } + None => { + let mut new_map = BTreeMap::new(); + new_map.insert(key, insert_cl); + cl_cache.insert(tbl_info.tbl_name.clone(), new_map); + } + } + } } res } + +unsafe fn get_pk_cl( + db: *mut sqlite3, + cl_cache: &mut BTreeMap>, + tbl_info: &TableInfo, + key: sqlite::int64, +) -> Result { + match cl_cache.get(&tbl_info.tbl_name).and_then(|x| x.get(&key)) { + Some(cl) => Ok(*cl), + None => { + let cl = get_local_cl(db, tbl_info, key)?; + cl_cache + .entry(tbl_info.tbl_name.clone()) + .or_default() + .insert(key, cl); + Ok(cl) + } + } +} diff --git a/core/rs/core/src/commit.rs b/core/rs/core/src/commit.rs index 4c9e43359..43dc64e0c 100644 --- a/core/rs/core/src/commit.rs +++ b/core/rs/core/src/commit.rs @@ -1,4 +1,4 @@ -use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; +use alloc::{boxed::Box, collections::BTreeMap, string::String, vec::Vec}; use core::{ ffi::{c_int, c_void}, mem, @@ -37,5 +37,17 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) { let mut ordinals: mem::ManuallyDrop, i64>>> = mem::ManuallyDrop::new( Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), ); + + let mut cl_cache = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )) + }; + + for (_, map) in cl_cache.iter_mut() { + if !map.is_empty() { + map.clear(); + } + } ordinals.clear(); } diff --git a/core/rs/core/src/db_version.rs b/core/rs/core/src/db_version.rs index 9aa604732..f5ac5ccbc 100644 --- a/core/rs/core/src/db_version.rs +++ b/core/rs/core/src/db_version.rs @@ -211,6 +211,21 @@ pub extern "C" fn crsql_drop_ordinal_map(ext_data: *mut crsql_ExtData) { } } +#[no_mangle] +pub extern "C" fn crsql_init_cl_cache(ext_data: *mut crsql_ExtData) { + let map: BTreeMap> = BTreeMap::new(); + unsafe { (*ext_data).clCache = Box::into_raw(Box::new(map)) as *mut c_void } +} + +#[no_mangle] +pub extern "C" fn crsql_drop_cl_cache(ext_data: *mut crsql_ExtData) { + unsafe { + drop(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )); + } +} + pub fn insert_db_version( ext_data: *mut crsql_ExtData, insert_site_id: &[u8], diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index a8a0554d0..2f9133d33 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -49,6 +49,7 @@ mod unpack_columns_vtab; mod util; use alloc::format; +use alloc::string::String; use alloc::string::ToString; use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, vec::Vec}; use core::ffi::c_char; @@ -453,19 +454,30 @@ pub extern "C" fn sqlite3_crsqlcore_init( } #[cfg(feature = "test")] - let rc = db - .create_function_v2( - "crsql_cache_site_ordinal", - 1, - sqlite::UTF8 | sqlite::DETERMINISTIC, - Some(ext_data as *mut c_void), - Some(x_crsql_cache_site_ordinal), - None, - None, - None, - ) - .unwrap_or(ResultCode::ERROR); - if rc != ResultCode::OK { + if let Err(_) = db.create_function_v2( + "crsql_cache_site_ordinal", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_site_ordinal), + None, + None, + None, + ) { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + + if let Err(_) = db.create_function_v2( + "crsql_cache_pk_cl", + 2, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_pk_cl), + None, + None, + None, + ) { unsafe { crsql_freeExtData(ext_data) }; return null_mut(); } @@ -975,6 +987,39 @@ unsafe extern "C" fn x_crsql_cache_site_ordinal( sqlite::result_int64(ctx, res); } +/** + * Get the pk cl cached in the ext data for the current transaction. + * only used for test to inspect the cl cache. + */ +unsafe extern "C" fn x_crsql_cache_pk_cl( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + if argc < 2 { + ctx.result_error( + "Wrong number of args provided to crsql_cache_pk_cl. Provide the table name and pk key.", + ); + return; + } + + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let args = sqlite::args!(argc, argv); + let table_name = args[0].text(); + let pk_key = args[1].int64(); + + let cl_cache = mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )); + + let table_map = cl_cache.get(table_name); + let cl = table_map + .and_then(|x| x.get(&pk_key)) + .cloned() + .unwrap_or(-1); + sqlite::result_int64(ctx, cl); +} + /** * Return the timestamp for the current transaction. */ diff --git a/core/rs/core/src/local_writes/after_delete.rs b/core/rs/core/src/local_writes/after_delete.rs index 1f5b922e5..c624d4247 100644 --- a/core/rs/core/src/local_writes/after_delete.rs +++ b/core/rs/core/src/local_writes/after_delete.rs @@ -1,6 +1,8 @@ use alloc::string::String; use alloc::string::ToString; +use alloc::{boxed::Box, collections::BTreeMap}; use core::ffi::c_int; +use core::mem; use sqlite::sqlite3; use sqlite::value; use sqlite::Context; @@ -10,6 +12,7 @@ use sqlite_nostd as sqlite; use crate::{c::crsql_ExtData, tableinfo::TableInfo}; use super::bump_seq; +use super::mark_locally_deleted; use super::trigger_fn_preamble; /** @@ -47,19 +50,7 @@ fn after_delete( .get_or_create_key_via_raw_values(db, pks_old) .map_err(|_| "failed getting or creating lookaside key")?; - let mark_locally_deleted_stmt_ref = tbl_info - .get_mark_locally_deleted_stmt(db) - .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; - let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; - mark_locally_deleted_stmt - .bind_int64(1, key) - .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) - .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) - .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) - .map_err(|_| "failed binding to mark locally deleted stmt")?; - super::step_trigger_stmt(mark_locally_deleted_stmt)?; + let cl = mark_locally_deleted(db, tbl_info, key, db_version, seq, &ts)?; // now actually delete the row metadata let drop_clocks_stmt_ref = tbl_info @@ -72,5 +63,17 @@ fn after_delete( drop_clocks_stmt .bind_int64(1, key) .map_err(|_e| "failed to bind pks to drop_clocks_stmt")?; - super::step_trigger_stmt(drop_clocks_stmt) + super::step_trigger_stmt(drop_clocks_stmt)?; + + let mut cl_cache = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )) + }; + cl_cache + .entry(tbl_info.tbl_name.clone()) + .or_default() + .insert(key, cl); + + Ok(ResultCode::OK) } diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index e7a25c7c8..b6482a102 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -1,6 +1,9 @@ +use alloc::boxed::Box; +use alloc::collections::BTreeMap; use alloc::string::String; use alloc::string::ToString; use core::ffi::c_int; +use core::mem; use sqlite::sqlite3; use sqlite::value; use sqlite::Context; @@ -53,7 +56,17 @@ fn after_insert( } else if create_record_existed { // update the create record since it already exists. let seq = bump_seq(ext_data); - update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; + let col_version = update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; + + let mut cl_cache = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )) + }; + cl_cache + .entry(tbl_info.tbl_name.clone()) + .or_default() + .insert(key_new, col_version); } super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; @@ -68,7 +81,7 @@ fn update_create_record( db_version: sqlite::int64, seq: i32, ts: &str, -) -> Result { +) -> Result { let update_create_record_stmt_ref = tbl_info .get_maybe_mark_locally_reinserted_stmt(db) .map_err(|_e| "failed to get update_create_record_stmt")?; @@ -90,5 +103,18 @@ fn update_create_record( }) .map_err(|_e| "failed binding to update_create_record_stmt")?; - super::step_trigger_stmt(update_create_record_stmt) + let res = update_create_record_stmt.step(); + match res { + Ok(ResultCode::ROW) => { + let col_version = update_create_record_stmt.column_int64(0); + super::reset_cached_stmt(update_create_record_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + Ok(col_version) + } + _ => { + super::reset_cached_stmt(update_create_record_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + Err("failed to step update_create_record_stmt".to_string()) + } + } } diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index 43f67b5fa..47fb4494f 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -1,8 +1,10 @@ use core::ffi::c_int; +use core::mem; use alloc::format; use alloc::string::String; use alloc::string::ToString; +use alloc::{boxed::Box, collections::BTreeMap}; use sqlite::{sqlite3, value, Context, ResultCode}; use sqlite_nostd as sqlite; @@ -81,39 +83,37 @@ fn after_update( let mut changed = false; // Changing a primary key column to a new value is the same thing as deleting the row // previously identified by the primary key. - if crate::compare_values::any_value_changed(pks_new, pks_old)? { - let old_key = tbl_info - .get_or_create_key_via_raw_values(db, pks_old) - .map_err(|_| "failed geteting or creating lookaside key")?; - let next_seq = super::bump_seq(ext_data); - changed = true; - // Record the delete of the row identified by the old primary keys - after_update__mark_old_pk_row_deleted( - db, - tbl_info, - old_key, - next_db_version, - next_seq, - &ts, - )?; - let next_seq = super::bump_seq(ext_data); - // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). - super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, &ts)?; - for col in tbl_info.non_pks.iter() { + let cl_info = { + if crate::compare_values::any_value_changed(pks_new, pks_old)? { + let old_key = tbl_info + .get_or_create_key_via_raw_values(db, pks_old) + .map_err(|_| "failed geteting or creating lookaside key")?; let next_seq = super::bump_seq(ext_data); - after_update__move_non_pk_col( - db, - tbl_info, - new_key, - old_key, - &col.name, - next_db_version, - &ts, - next_seq, - )?; + changed = true; + // Record the delete of the row identified by the old primary keys + let cl = + super::mark_locally_deleted(db, tbl_info, old_key, next_db_version, next_seq, &ts)?; + let next_seq = super::bump_seq(ext_data); + // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). + super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, &ts)?; + for col in tbl_info.non_pks.iter() { + let next_seq = super::bump_seq(ext_data); + after_update__move_non_pk_col( + db, + tbl_info, + new_key, + old_key, + &col.name, + next_db_version, + &ts, + next_seq, + )?; + } + Some((old_key, cl)) + } else { + None } - } - + }; // now for each non_pk_col we need to do an insert // where new value is not old value for ((new, old), col_info) in non_pks_new @@ -144,32 +144,19 @@ fn after_update( crate::db_version::next_db_version(db, ext_data)?; } - Ok(ResultCode::OK) -} + if let Some((old_key, cl)) = cl_info { + let mut cl_cache = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).clCache as *mut BTreeMap>, + )) + }; + cl_cache + .entry(tbl_info.tbl_name.clone()) + .or_default() + .insert(old_key, cl); + } -#[allow(non_snake_case)] -fn after_update__mark_old_pk_row_deleted( - db: *mut sqlite3, - tbl_info: &TableInfo, - old_key: sqlite::int64, - db_version: sqlite::int64, - seq: i32, - ts: &str, -) -> Result { - let mark_locally_deleted_stmt_ref = tbl_info - .get_mark_locally_deleted_stmt(db) - .or_else(|_e| Err("failed to get mark_locally_deleted_stmt"))?; - let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; - mark_locally_deleted_stmt - .bind_int64(1, old_key) - .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) - .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) - .and_then(|_| mark_locally_deleted_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) - // .and_then(|_| mark_locally_deleted_stmt.bind_int64(4, db_version)) - .or_else(|_| Err("failed binding to mark_locally_deleted_stmt"))?; - super::step_trigger_stmt(mark_locally_deleted_stmt) + Ok(ResultCode::OK) } #[allow(non_snake_case)] diff --git a/core/rs/core/src/local_writes/mod.rs b/core/rs/core/src/local_writes/mod.rs index f89483b52..6a987f262 100644 --- a/core/rs/core/src/local_writes/mod.rs +++ b/core/rs/core/src/local_writes/mod.rs @@ -267,3 +267,43 @@ fn mark_locally_updated( Ok(ResultCode::OK) } + +fn mark_locally_deleted( + db: *mut sqlite3, + tbl_info: &TableInfo, + key: sqlite::int64, + db_version: sqlite::int64, + seq: i32, + ts: &str, +) -> Result { + let mark_locally_deleted_stmt_ref = tbl_info + .get_mark_locally_deleted_stmt(db) + .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; + + let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref + .as_ref() + .ok_or("Failed to deref sentinel stmt")?; + + mark_locally_deleted_stmt + .bind_int64(1, key) + .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) + .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) + .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) + .map_err(|_| "failed binding to mark locally deleted stmt")?; + + let res = mark_locally_deleted_stmt.step(); + + match res { + Ok(ResultCode::ROW) => { + let cl = mark_locally_deleted_stmt.column_int64(0); + reset_cached_stmt(mark_locally_deleted_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + Ok(cl) + } + _ => { + reset_cached_stmt(mark_locally_deleted_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + Err("failed to step mark locally deleted stmt".to_string()) + } + } +} diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 5a6eb6bc4..3b8081f65 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -458,7 +458,8 @@ impl TableInfo { db_version = excluded.db_version, seq = excluded.seq, site_id = 0, - ts = excluded.ts", + ts = excluded.ts + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::DELETE_SENTINEL, ); @@ -644,7 +645,8 @@ impl TableInfo { seq = ?, site_id = 0, ts = ? - WHERE key = ? AND col_name = ?", + WHERE key = ? AND col_name = ? + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), ); let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; @@ -862,7 +864,8 @@ pub extern "C" fn crsql_ensure_table_infos_are_up_to_date( return ResultCode::ERROR as c_int; } - let mut table_infos = unsafe { Box::from_raw((*ext_data).tableInfos as *mut Vec) }; + let mut table_infos: Box> = + unsafe { Box::from_raw((*ext_data).tableInfos as *mut Vec) }; if schema_changed > 0 || table_infos.len() == 0 { match pull_all_table_infos(db, ext_data, err) { diff --git a/core/rs/integration_check/Cargo.lock b/core/rs/integration_check/Cargo.lock index 9ce49607f..fd9e431fd 100644 --- a/core/rs/integration_check/Cargo.lock +++ b/core/rs/integration_check/Cargo.lock @@ -123,6 +123,7 @@ version = "0.1.0" dependencies = [ "crsql_core", "crsql_fractindex_core", + "libc-print", "sqlite_nostd", ] diff --git a/core/rs/integration_check/src/t/pk_only_tables.rs b/core/rs/integration_check/src/t/pk_only_tables.rs index 8784caf87..3e8135d1c 100644 --- a/core/rs/integration_check/src/t/pk_only_tables.rs +++ b/core/rs/integration_check/src/t/pk_only_tables.rs @@ -149,7 +149,8 @@ fn modify_pkonly_row() -> Result<(), ResultCode> { .db .prepare_v2("UPDATE foo SET id = 2 WHERE id = 1;") .expect("prepare set to foo"); - stmt.step().expect("step update to foo"); + let result = stmt.step(); + assert_eq!(result, Ok(ResultCode::DONE), "failed to update foo"); sync_left_to_right(&db_a.db, &db_b.db, -1); diff --git a/core/rs/integration_check/src/t/tableinfo.rs b/core/rs/integration_check/src/t/tableinfo.rs index 1f5574c72..f7b9f982c 100644 --- a/core/rs/integration_check/src/t/tableinfo.rs +++ b/core/rs/integration_check/src/t/tableinfo.rs @@ -366,7 +366,9 @@ fn test_site_id_initialization() { let raw_db = db.db.db; let site_id = select_site_id(raw_db).expect("selected site id"); assert_eq!(site_id.len(), 16); - raw_db.exec_safe("DELETE FROM crsql_site_id;").expect("deleted site id"); + raw_db + .exec_safe("DELETE FROM crsql_site_id;") + .expect("deleted site id"); } { @@ -374,7 +376,9 @@ fn test_site_id_initialization() { let raw_db = db.db.db; let site_id = select_site_id(raw_db).expect("selected site id"); assert_eq!(site_id.len(), 16); - raw_db.exec_safe("DROP TABLE crsql_site_id;").expect("dropped crsql_site_id"); + raw_db + .exec_safe("DROP TABLE crsql_site_id;") + .expect("dropped crsql_site_id"); } { diff --git a/core/rs/integration_check/src/t/test_db_version.rs b/core/rs/integration_check/src/t/test_db_version.rs index aacf08ab7..9e00ae032 100644 --- a/core/rs/integration_check/src/t/test_db_version.rs +++ b/core/rs/integration_check/src/t/test_db_version.rs @@ -3,7 +3,7 @@ use alloc::{ffi::CString, format, string::String}; use core::ffi::c_char; use crsql_bundle::test_exports; use sqlite::{Connection, ResultCode}; -use sqlite_nostd as sqlite; +use sqlite_nostd::{self as sqlite, ManagedStmt}; fn make_site() -> *mut c_char { let inner_ptr: *mut c_char = CString::new("0000000000000000").unwrap().into_raw(); @@ -19,7 +19,8 @@ fn get_site_id(db: *mut sqlite::sqlite3) -> *mut c_char { let blob_ptr = stmt.column_blob(0).expect("failed to get site_id"); - let cstring = CString::new(blob_ptr.to_vec()).expect("failed to create CString from site id"); + // use vec_unchecked because `new` errors if there's a 0 byte in the vec. + let cstring = unsafe { CString::from_vec_unchecked(blob_ptr.to_vec()) }; cstring.into_raw() as *mut c_char } @@ -182,6 +183,164 @@ fn test_get_or_set_site_ordinal() -> Result<(), ResultCode> { Ok(()) } +fn test_get_or_set_pk_cl() -> Result<(), ResultCode> { + let c = crate::opendb().expect("db opened"); + let db = &c.db; + db.db + .exec_safe("CREATE TABLE foo (a primary key not null, b);")?; + + db.db.exec_safe("SELECT crsql_as_crr('foo');")?; + + let insert_foo_stmt = db.db.prepare_v2("INSERT INTO foo VALUES (?, ?);")?; + + let get_pk_key_stmt = db + .db + .prepare_v2("SELECT __crsql_key from foo__crsql_pks where a = ?;")?; + + let get_cache_cl_stmt = db.db.prepare_v2("SELECT crsql_cache_pk_cl(?, ?);")?; + + db.db.exec_safe("BEGIN TRANSACTION;")?; + + // insert row, pl cache doesn't get updated on new insert + + insert_foo_row(&insert_foo_stmt, 1, "b")?; + + insert_foo_row(&insert_foo_stmt, 2, "c")?; + + let key1 = get_pk_key(&get_pk_key_stmt, 1).expect("get pk key"); + let key2 = get_pk_key(&get_pk_key_stmt, 2).expect("get pk key"); + + let delete_foo_stmt = db + .db + .prepare_v2("DELETE FROM foo WHERE a = ?;") + .expect("prepare delete foo"); + delete_foo_stmt.bind_int64(1, 1)?; + delete_foo_stmt.step()?; + + // replace pk 2 with pk 3 + let update_foo_stmt = db + .db + .prepare_v2("UPDATE foo SET a = ? WHERE a = ?;") + .expect("prepare update foo"); + update_foo_stmt.bind_int64(1, 3)?; + update_foo_stmt.bind_int64(2, 2)?; + update_foo_stmt.step()?; + + // pk 1 and 2 should have a cl of 2 since they have been deleted + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key1)?); + + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + // reinsert pk 2, check cl is 3 + insert_foo_row(&insert_foo_stmt, 2, "d")?; + assert_eq!(3, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + db.db.exec_safe("COMMIT;")?; + + // commit clears the cache + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key1)?); + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + db.db.exec_safe("BEGIN TRANSACTION;")?; + db.db.exec_safe("SAVEPOINT test;")?; + + // new site_id in crsql_changes table + // pk number is 4 + let pk: [u8; 3] = [1, 9, 4]; + + // insert should update the cache. + insert_crsql_changes_row(db.db, &pk, "b", "e", 1, 1, 1).expect("insert crsql changes row"); + let key4 = get_pk_key(&get_pk_key_stmt, 4).expect("get pk key"); + assert_eq!(1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // a delete should also update the cache. + insert_crsql_changes_row(db.db, &pk, "-1", "", 2, 2, 2)?; + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // test that a resurrected cache would also get updated. + insert_crsql_changes_row(db.db, &pk, "b", "f", 1, 3, 5)?; + assert_eq!(5, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // a lower cl should not update the cache + insert_crsql_changes_row(db.db, &pk, "b", "e", 1, 3, 3)?; + assert_eq!(5, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + db.db.exec_safe("ROLLBACK TO SAVEPOINT test;")?; + + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + Ok(()) +} + +fn insert_crsql_changes_row( + db: *mut sqlite::sqlite3, + pk: &[u8], + cid: &str, + val: &str, + col_version: i64, + db_version: i64, + cl: i64, +) -> Result<(), ResultCode> { + let stmt = db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, ?, ?, ?, ?, X'0000000000000000', ?, 0, 0);", + )?; + stmt.bind_blob(1, pk, sqlite::Destructor::STATIC)?; + stmt.bind_text(2, cid, sqlite::Destructor::STATIC)?; + if cid == "-1" { + stmt.bind_null(3)?; + } else { + stmt.bind_text(3, val, sqlite::Destructor::STATIC)?; + } + stmt.bind_int64(4, col_version)?; + stmt.bind_int64(5, db_version)?; + stmt.bind_int64(6, cl)?; + stmt.step()?; + Ok(()) +} + +fn insert_foo_row(stmt: &ManagedStmt, col1: i64, col2: &str) -> Result<(), ResultCode> { + stmt.bind_int64(1, col1)?; + stmt.bind_text(2, col2, sqlite::Destructor::STATIC)?; + stmt.step()?; + + reset_cached_stmt(stmt)?; + Ok(()) +} + +fn get_pk_key(stmt: &ManagedStmt, pk_value: i64) -> Result { + stmt.bind_int64(1, pk_value)?; + let res = stmt.step()?; + match res { + ResultCode::ROW => { + let key = stmt.column_int64(0); + reset_cached_stmt(stmt)?; + Ok(key) + } + _ => { + reset_cached_stmt(stmt)?; + Err(ResultCode::ERROR) + } + } +} + +fn get_cache_cl(stmt: &ManagedStmt, table_name: &str, pk_key: i64) -> Result { + stmt.bind_text(1, table_name, sqlite::Destructor::STATIC) + .expect("bind table name"); + stmt.bind_int64(2, pk_key).expect("bind pk key"); + let res = stmt.step(); + match res { + Ok(ResultCode::ROW) => { + let key = stmt.column_int64(0); + reset_cached_stmt(stmt).expect("reset cached stmt"); + Ok(key) + } + _ => { + reset_cached_stmt(stmt).expect("reset cached stmt"); + Err(ResultCode::ERROR) + } + } +} + fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result { let stmt = db.prepare_v2("SELECT crsql_cache_site_ordinal(?);")?; stmt.bind_blob(1, site_id, sqlite::Destructor::STATIC)?; @@ -192,8 +351,13 @@ fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result Result<(), String> { test_fetch_db_version_from_storage()?; test_next_db_version()?; - if let Err(rc) = test_get_or_set_site_ordinal() { - return Err(format!("test_get_or_set_site_ordinal failed: {:?}", rc)); - } + test_get_or_set_site_ordinal() + .map_err(|e| format!("test_get_or_set_site_ordinal failed: {:?}", e))?; + test_get_or_set_pk_cl().map_err(|e| format!("test_get_or_set_pk_cl failed: {:?}", e))?; Ok(()) } + +pub fn reset_cached_stmt(stmt: &ManagedStmt) -> Result { + stmt.clear_bindings()?; + stmt.reset() +} diff --git a/core/src/ext-data.c b/core/src/ext-data.c index 5ae04582d..65d0ef25e 100644 --- a/core/src/ext-data.c +++ b/core/src/ext-data.c @@ -12,6 +12,8 @@ void crsql_drop_ordinal_map(crsql_ExtData *pExtData); void crsql_drop_table_info_vec(crsql_ExtData *pExtData); void crsql_init_last_db_versions_map(crsql_ExtData *pExtData); void crsql_drop_last_db_versions_map(crsql_ExtData *pExtData); +void crsql_init_cl_cache(crsql_ExtData *pExtData); +void crsql_drop_cl_cache(crsql_ExtData *pExtData); // void crsql_init_table_info_vec(crsql_ExtData *pExtData); // void crsql_drop_table_info_vec(crsql_ExtData *pExtData); @@ -75,6 +77,7 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db) { crsql_init_table_info_vec(pExtData); crsql_init_last_db_versions_map(pExtData); crsql_init_ordinal_map(pExtData); + crsql_init_cl_cache(pExtData); sqlite3_stmt *pStmt; @@ -167,6 +170,7 @@ void crsql_freeExtData(crsql_ExtData *pExtData) { crsql_drop_table_info_vec(pExtData); crsql_drop_last_db_versions_map(pExtData); crsql_drop_ordinal_map(pExtData); + crsql_drop_cl_cache(pExtData); sqlite3_free(pExtData); } diff --git a/core/src/ext-data.h b/core/src/ext-data.h index 2d8dd9b41..155192eeb 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -49,6 +49,7 @@ struct crsql_ExtData { int mergeEqualValues; unsigned long long timestamp; void *ordinalMap; + void *clCache; }; crsql_ExtData *crsql_newExtData(sqlite3 *db); @@ -61,5 +62,7 @@ int crsql_recreate_db_version_stmt(sqlite3 *db, crsql_ExtData *pExtData); void crsql_finalize(crsql_ExtData *pExtData); void crsql_init_ordinal_map(crsql_ExtData *pExtData); void crsql_drop_ordinal_map(crsql_ExtData *pExtData); +void crsql_init_cl_cache(crsql_ExtData *pExtData); +void crsql_drop_cl_cache(crsql_ExtData *pExtData); #endif From 74967a4c970a2f515f0e8b11ee28caa8e539f0b7 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 6 Nov 2025 19:46:40 +0100 Subject: [PATCH 2/5] put pk on tableinfo instead Signed-off-by: Somtochi Onyekwere --- core/rs/core/src/c.rs | 11 --- core/rs/core/src/changes_vtab.rs | 13 +-- core/rs/core/src/changes_vtab_write.rs | 97 +++++++------------ core/rs/core/src/commit.rs | 17 ++-- core/rs/core/src/db_version.rs | 15 --- core/rs/core/src/lib.rs | 23 ++--- core/rs/core/src/local_writes/after_delete.rs | 38 +++----- core/rs/core/src/local_writes/after_insert.rs | 16 +-- core/rs/core/src/local_writes/after_update.rs | 14 +-- core/rs/core/src/local_writes/mod.rs | 6 +- core/rs/core/src/tableinfo.rs | 17 ++++ 11 files changed, 99 insertions(+), 168 deletions(-) diff --git a/core/rs/core/src/c.rs b/core/rs/core/src/c.rs index 44f82926a..eeb3f784c 100644 --- a/core/rs/core/src/c.rs +++ b/core/rs/core/src/c.rs @@ -74,7 +74,6 @@ pub struct crsql_ExtData { pub mergeEqualValues: ::core::ffi::c_int, pub timestamp: ::core::ffi::c_ulonglong, pub ordinalMap: *mut ::core::ffi::c_void, - pub clCache: *mut ::core::ffi::c_void, } #[repr(C)] @@ -512,14 +511,4 @@ fn bindgen_test_layout_crsql_ExtData() { stringify!(ordinalMap) ) ); - assert_eq!( - unsafe { ::core::ptr::addr_of!((*ptr).clCache) as usize - ptr as usize }, - 168usize, - concat!( - "Offset of field: ", - stringify!(crsql_ExtData), - "::", - stringify!(clCache) - ) - ); } diff --git a/core/rs/core/src/changes_vtab.rs b/core/rs/core/src/changes_vtab.rs index 176f9fa34..47e760b79 100644 --- a/core/rs/core/src/changes_vtab.rs +++ b/core/rs/core/src/changes_vtab.rs @@ -587,18 +587,15 @@ pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) - )) }; - let mut cl_cache = unsafe { + let mut table_infos = unsafe { mem::ManuallyDrop::new(Box::from_raw( - (*(*tab).pExtData).clCache as *mut BTreeMap>, + (*(*tab).pExtData).tableInfos as *mut Vec, )) }; - - for (_, map) in cl_cache.iter_mut() { - if !map.is_empty() { - map.clear(); - } + for tbl_info in table_infos.iter_mut() { + tbl_info.clear_cl_cache(); } - + ordinals.clear(); ResultCode::OK as c_int } diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index f38daaf75..a543f78ab 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -1,8 +1,6 @@ use alloc::boxed::Box; -use alloc::collections::BTreeMap; use alloc::ffi::CString; use alloc::format; -use alloc::string::String; use alloc::vec::Vec; use core::ffi::{c_char, c_int}; use core::mem; @@ -382,36 +380,45 @@ pub unsafe extern "C" fn crsql_merge_insert( fn get_local_cl( db: *mut sqlite::sqlite3, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, key: sqlite::int64, ) -> Result { - let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; - let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - - let rc = local_cl_stmt - .bind_int64(1, key) - .and_then(|_| local_cl_stmt.bind_int64(2, key)); - if let Err(rc) = rc { - reset_cached_stmt(local_cl_stmt.stmt)?; - return Err(rc); + if let Some(cl) = tbl_info.get_cl(key) { + return Ok(*cl); } - let step_result = local_cl_stmt.step(); - match step_result { - Ok(ResultCode::ROW) => { - let ret = local_cl_stmt.column_int64(0); - reset_cached_stmt(local_cl_stmt.stmt)?; - Ok(ret) - } - Ok(ResultCode::DONE) => { + let cl = { + let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; + let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; + + let rc = local_cl_stmt + .bind_int64(1, key) + .and_then(|_| local_cl_stmt.bind_int64(2, key)); + if let Err(rc) = rc { reset_cached_stmt(local_cl_stmt.stmt)?; - Ok(0) + return Err(rc); } - Ok(rc) | Err(rc) => { - reset_cached_stmt(local_cl_stmt.stmt)?; - Err(rc) + + let step_result = local_cl_stmt.step(); + match step_result { + Ok(ResultCode::ROW) => { + let ret = local_cl_stmt.column_int64(0); + reset_cached_stmt(local_cl_stmt.stmt)?; + ret + } + Ok(ResultCode::DONE) => { + reset_cached_stmt(local_cl_stmt.stmt)?; + 0 + } + Ok(rc) | Err(rc) => { + reset_cached_stmt(local_cl_stmt.stmt)?; + return Err(rc); + } } - } + }; + + tbl_info.set_cl(key, cl); + Ok(cl) } unsafe fn merge_insert( @@ -465,14 +472,10 @@ unsafe fn merge_insert( let insert_site_id = insert_site_id.blob(); - let tbl_infos = mem::ManuallyDrop::new(Box::from_raw( + let mut tbl_infos = mem::ManuallyDrop::new(Box::from_raw( (*(*tab).pExtData).tableInfos as *mut Vec, )); - let mut cl_cache = mem::ManuallyDrop::new(Box::from_raw( - (*(*tab).pExtData).clCache as *mut BTreeMap>, - )); - // TODO: will this work given `insert_tbl` is null termed? let tbl_info_index = tbl_infos.iter().position(|x| x.tbl_name == insert_tbl); @@ -487,14 +490,14 @@ unsafe fn merge_insert( // TODO: technically safe since we checked `is_none` but this should be more idiomatic let tbl_info_index = tbl_info_index.unwrap(); - let tbl_info = &tbl_infos[tbl_info_index]; + let tbl_info = &mut tbl_infos[tbl_info_index]; let unpacked_pks = unpack_columns(insert_pks.blob())?; // Get or create key as the first thing we do. // We'll need the key for all later operations. let key = tbl_info.get_or_create_key(db, &unpacked_pks)?; - let local_cl = get_pk_cl(db, &mut cl_cache, &tbl_info, key)?; + let local_cl = get_local_cl(db, tbl_info, key)?; // We can ignore all updates from older causal lengths. // They won't win at anything. @@ -720,37 +723,9 @@ unsafe fn merge_insert( // a bigger cl always wins if insert_cl > local_cl { - match cl_cache.get_mut(&tbl_info.tbl_name) { - Some(x) => { - x.insert(key, insert_cl); - } - None => { - let mut new_map = BTreeMap::new(); - new_map.insert(key, insert_cl); - cl_cache.insert(tbl_info.tbl_name.clone(), new_map); - } - } + tbl_info.set_cl(key, insert_cl); } } res } - -unsafe fn get_pk_cl( - db: *mut sqlite3, - cl_cache: &mut BTreeMap>, - tbl_info: &TableInfo, - key: sqlite::int64, -) -> Result { - match cl_cache.get(&tbl_info.tbl_name).and_then(|x| x.get(&key)) { - Some(cl) => Ok(*cl), - None => { - let cl = get_local_cl(db, tbl_info, key)?; - cl_cache - .entry(tbl_info.tbl_name.clone()) - .or_default() - .insert(key, cl); - Ok(cl) - } - } -} diff --git a/core/rs/core/src/commit.rs b/core/rs/core/src/commit.rs index 43dc64e0c..096482e9e 100644 --- a/core/rs/core/src/commit.rs +++ b/core/rs/core/src/commit.rs @@ -1,4 +1,4 @@ -use alloc::{boxed::Box, collections::BTreeMap, string::String, vec::Vec}; +use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; use core::{ ffi::{c_int, c_void}, mem, @@ -8,6 +8,7 @@ use core::{ use sqlite_nostd::ResultCode; use crate::c::crsql_ExtData; +use crate::tableinfo::TableInfo; #[no_mangle] pub unsafe extern "C" fn crsql_commit_hook(user_data: *mut c_void) -> c_int { @@ -38,16 +39,12 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) { Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), ); - let mut cl_cache = unsafe { - mem::ManuallyDrop::new(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )) + let mut table_infos = unsafe { + mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)) }; + ordinals.clear(); - for (_, map) in cl_cache.iter_mut() { - if !map.is_empty() { - map.clear(); - } + for tbl_info in table_infos.iter_mut() { + tbl_info.clear_cl_cache(); } - ordinals.clear(); } diff --git a/core/rs/core/src/db_version.rs b/core/rs/core/src/db_version.rs index f5ac5ccbc..9aa604732 100644 --- a/core/rs/core/src/db_version.rs +++ b/core/rs/core/src/db_version.rs @@ -211,21 +211,6 @@ pub extern "C" fn crsql_drop_ordinal_map(ext_data: *mut crsql_ExtData) { } } -#[no_mangle] -pub extern "C" fn crsql_init_cl_cache(ext_data: *mut crsql_ExtData) { - let map: BTreeMap> = BTreeMap::new(); - unsafe { (*ext_data).clCache = Box::into_raw(Box::new(map)) as *mut c_void } -} - -#[no_mangle] -pub extern "C" fn crsql_drop_cl_cache(ext_data: *mut crsql_ExtData) { - unsafe { - drop(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )); - } -} - pub fn insert_db_version( ext_data: *mut crsql_ExtData, insert_site_id: &[u8], diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 2f9133d33..71254354f 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -49,7 +49,6 @@ mod unpack_columns_vtab; mod util; use alloc::format; -use alloc::string::String; use alloc::string::ToString; use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, vec::Vec}; use core::ffi::c_char; @@ -74,7 +73,9 @@ use local_writes::after_update::x_crsql_after_update; use sqlite::{Destructor, ResultCode}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; -use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info}; +use tableinfo::{ + crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info, TableInfo, +}; use teardown::*; use triggers::create_triggers; @@ -1008,16 +1009,16 @@ unsafe extern "C" fn x_crsql_cache_pk_cl( let table_name = args[0].text(); let pk_key = args[1].int64(); - let cl_cache = mem::ManuallyDrop::new(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )); + let table_infos = + mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)); + let table_info = table_infos.iter().find(|t| t.tbl_name == table_name); - let table_map = cl_cache.get(table_name); - let cl = table_map - .and_then(|x| x.get(&pk_key)) - .cloned() - .unwrap_or(-1); - sqlite::result_int64(ctx, cl); + if let Some(table_info) = table_info { + let cl = table_info.get_cl(pk_key).cloned().unwrap_or(-1); + sqlite::result_int64(ctx, cl); + } else { + ctx.result_error("table not found"); + } } /** diff --git a/core/rs/core/src/local_writes/after_delete.rs b/core/rs/core/src/local_writes/after_delete.rs index c624d4247..724a4b386 100644 --- a/core/rs/core/src/local_writes/after_delete.rs +++ b/core/rs/core/src/local_writes/after_delete.rs @@ -1,8 +1,6 @@ use alloc::string::String; use alloc::string::ToString; -use alloc::{boxed::Box, collections::BTreeMap}; use core::ffi::c_int; -use core::mem; use sqlite::sqlite3; use sqlite::value; use sqlite::Context; @@ -40,7 +38,7 @@ pub unsafe extern "C" fn x_crsql_after_delete( fn after_delete( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_old: &[*mut value], ) -> Result { let ts = unsafe { (*ext_data).timestamp.to_string() }; @@ -52,28 +50,22 @@ fn after_delete( let cl = mark_locally_deleted(db, tbl_info, key, db_version, seq, &ts)?; - // now actually delete the row metadata - let drop_clocks_stmt_ref = tbl_info - .get_merge_delete_drop_clocks_stmt(db) - .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; - let drop_clocks_stmt = drop_clocks_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; + { + // now actually delete the row metadata + let drop_clocks_stmt_ref = tbl_info + .get_merge_delete_drop_clocks_stmt(db) + .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; + let drop_clocks_stmt = drop_clocks_stmt_ref + .as_ref() + .ok_or("Failed to deref sentinel stmt")?; - drop_clocks_stmt - .bind_int64(1, key) - .map_err(|_e| "failed to bind pks to drop_clocks_stmt")?; - super::step_trigger_stmt(drop_clocks_stmt)?; + drop_clocks_stmt + .bind_int64(1, key) + .map_err(|_e| "failed to bind pks to drop_clocks_stmt")?; + super::step_trigger_stmt(drop_clocks_stmt)?; + } - let mut cl_cache = unsafe { - mem::ManuallyDrop::new(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )) - }; - cl_cache - .entry(tbl_info.tbl_name.clone()) - .or_default() - .insert(key, cl); + tbl_info.set_cl(key, cl); Ok(ResultCode::OK) } diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index b6482a102..01cac10d2 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -1,9 +1,6 @@ -use alloc::boxed::Box; -use alloc::collections::BTreeMap; use alloc::string::String; use alloc::string::ToString; use core::ffi::c_int; -use core::mem; use sqlite::sqlite3; use sqlite::value; use sqlite::Context; @@ -40,7 +37,7 @@ pub unsafe extern "C" fn x_crsql_after_insert( fn after_insert( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_new: &[*mut value], ) -> Result { let ts = unsafe { (*ext_data).timestamp.to_string() }; @@ -57,16 +54,7 @@ fn after_insert( // update the create record since it already exists. let seq = bump_seq(ext_data); let col_version = update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; - - let mut cl_cache = unsafe { - mem::ManuallyDrop::new(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )) - }; - cl_cache - .entry(tbl_info.tbl_name.clone()) - .or_default() - .insert(key_new, col_version); + tbl_info.set_cl(key_new, col_version); } super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index 47fb4494f..a702aaee1 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -1,10 +1,8 @@ use core::ffi::c_int; -use core::mem; use alloc::format; use alloc::string::String; use alloc::string::ToString; -use alloc::{boxed::Box, collections::BTreeMap}; use sqlite::{sqlite3, value, Context, ResultCode}; use sqlite_nostd as sqlite; @@ -68,7 +66,7 @@ fn partition_values( fn after_update( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_new: &[*mut value], pks_old: &[*mut value], non_pks_new: &[*mut value], @@ -145,15 +143,7 @@ fn after_update( } if let Some((old_key, cl)) = cl_info { - let mut cl_cache = unsafe { - mem::ManuallyDrop::new(Box::from_raw( - (*ext_data).clCache as *mut BTreeMap>, - )) - }; - cl_cache - .entry(tbl_info.tbl_name.clone()) - .or_default() - .insert(old_key, cl); + tbl_info.set_cl(old_key, cl); } Ok(ResultCode::OK) diff --git a/core/rs/core/src/local_writes/mod.rs b/core/rs/core/src/local_writes/mod.rs index 6a987f262..c7fae9e38 100644 --- a/core/rs/core/src/local_writes/mod.rs +++ b/core/rs/core/src/local_writes/mod.rs @@ -26,7 +26,7 @@ fn trigger_fn_preamble( f: F, ) -> Result where - F: Fn(&TableInfo, &[*mut sqlite::value], *mut crsql_ExtData) -> Result, + F: Fn(&mut TableInfo, &[*mut sqlite::value], *mut crsql_ExtData) -> Result, { if argc < 1 { return Err("expected at least 1 argument".to_string()); @@ -45,10 +45,10 @@ where )); } - let table_infos = + let mut table_infos = unsafe { ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)) }; let table_name = values[0].text(); - let table_info = match table_infos.iter().find(|t| &(t.tbl_name) == table_name) { + let table_info = match table_infos.iter_mut().find(|t| &(t.tbl_name) == table_name) { Some(t) => t, None => { return Err(format!("table {} not found", table_name)); diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 3b8081f65..a4f81f97f 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -7,6 +7,7 @@ use crate::pack_columns::ColumnValue; use crate::stmt_cache::reset_cached_stmt; use crate::util::Countable; use alloc::boxed::Box; +use alloc::collections::BTreeMap; use alloc::format; use alloc::string::String; use alloc::vec; @@ -66,9 +67,24 @@ pub struct TableInfo { move_non_sentinels_stmt: RefCell>, mark_locally_created_stmt: RefCell>, maybe_mark_locally_reinserted_stmt: RefCell>, + cl_cache: BTreeMap, } impl TableInfo { + pub fn get_cl(&self, key: i64) -> Option<&i64> { + self.cl_cache.get(&key) + } + + pub fn set_cl(&mut self, key: i64, cl: i64) { + self.cl_cache.insert(key, cl); + } + + pub fn clear_cl_cache(&mut self) { + if !self.cl_cache.is_empty() { + self.cl_cache.clear(); + } + } + fn find_non_pk_col(&self, col_name: &str) -> Result<&ColumnInfo, ResultCode> { for col in &self.non_pks { if col.name == col_name { @@ -1009,6 +1025,7 @@ pub fn pull_table_info( select_clock_stmt: RefCell::new(None), insert_clock_stmt: RefCell::new(None), update_clock_stmt: RefCell::new(None), + cl_cache: BTreeMap::new(), }) } From c2941467e6c47417cb21cfdc85dd8de26b76bea3 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 6 Nov 2025 20:12:24 +0100 Subject: [PATCH 3/5] clear cache if we are over limit Signed-off-by: Somtochi Onyekwere --- core/rs/core/src/c.rs | 2 +- core/rs/core/src/lib.rs | 6 +++++- core/rs/core/src/tableinfo.rs | 5 +++++ core/src/ext-data.c | 4 ---- core/src/ext-data.h | 3 --- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/rs/core/src/c.rs b/core/rs/core/src/c.rs index eeb3f784c..3a1539d96 100644 --- a/core/rs/core/src/c.rs +++ b/core/rs/core/src/c.rs @@ -271,7 +271,7 @@ fn bindgen_test_layout_crsql_ExtData() { let ptr = UNINIT.as_ptr(); assert_eq!( ::core::mem::size_of::(), - 176usize, + 168usize, concat!("Size of: ", stringify!(crsql_ExtData)) ); assert_eq!( diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 71254354f..e0c2e4f04 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -74,10 +74,12 @@ use sqlite::{Destructor, ResultCode}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; use tableinfo::{ - crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info, TableInfo, + crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info, }; use teardown::*; use triggers::create_triggers; +#[cfg(feature = "test")] +use tableinfo::TableInfo; pub use debug::debug_log; @@ -469,6 +471,7 @@ pub extern "C" fn sqlite3_crsqlcore_init( return null_mut(); } + #[cfg(feature = "test")] if let Err(_) = db.create_function_v2( "crsql_cache_pk_cl", 2, @@ -992,6 +995,7 @@ unsafe extern "C" fn x_crsql_cache_site_ordinal( * Get the pk cl cached in the ext data for the current transaction. * only used for test to inspect the cl cache. */ +#[cfg(feature = "test")] unsafe extern "C" fn x_crsql_cache_pk_cl( ctx: *mut sqlite::context, argc: i32, diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index a4f81f97f..0415ebd9b 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -28,6 +28,7 @@ use sqlite_nostd::ResultCode; use sqlite_nostd::Stmt; use sqlite_nostd::StrRef; +const MAX_CL_CACHE_SIZE: usize = 1500; pub struct TableInfo { pub tbl_name: String, pub pks: Vec, @@ -76,6 +77,10 @@ impl TableInfo { } pub fn set_cl(&mut self, key: i64, cl: i64) { + // clear the cache if we are over limit + if self.cl_cache.len() >= MAX_CL_CACHE_SIZE { + self.cl_cache.clear(); + } self.cl_cache.insert(key, cl); } diff --git a/core/src/ext-data.c b/core/src/ext-data.c index 65d0ef25e..5ae04582d 100644 --- a/core/src/ext-data.c +++ b/core/src/ext-data.c @@ -12,8 +12,6 @@ void crsql_drop_ordinal_map(crsql_ExtData *pExtData); void crsql_drop_table_info_vec(crsql_ExtData *pExtData); void crsql_init_last_db_versions_map(crsql_ExtData *pExtData); void crsql_drop_last_db_versions_map(crsql_ExtData *pExtData); -void crsql_init_cl_cache(crsql_ExtData *pExtData); -void crsql_drop_cl_cache(crsql_ExtData *pExtData); // void crsql_init_table_info_vec(crsql_ExtData *pExtData); // void crsql_drop_table_info_vec(crsql_ExtData *pExtData); @@ -77,7 +75,6 @@ crsql_ExtData *crsql_newExtData(sqlite3 *db) { crsql_init_table_info_vec(pExtData); crsql_init_last_db_versions_map(pExtData); crsql_init_ordinal_map(pExtData); - crsql_init_cl_cache(pExtData); sqlite3_stmt *pStmt; @@ -170,7 +167,6 @@ void crsql_freeExtData(crsql_ExtData *pExtData) { crsql_drop_table_info_vec(pExtData); crsql_drop_last_db_versions_map(pExtData); crsql_drop_ordinal_map(pExtData); - crsql_drop_cl_cache(pExtData); sqlite3_free(pExtData); } diff --git a/core/src/ext-data.h b/core/src/ext-data.h index 155192eeb..2d8dd9b41 100644 --- a/core/src/ext-data.h +++ b/core/src/ext-data.h @@ -49,7 +49,6 @@ struct crsql_ExtData { int mergeEqualValues; unsigned long long timestamp; void *ordinalMap; - void *clCache; }; crsql_ExtData *crsql_newExtData(sqlite3 *db); @@ -62,7 +61,5 @@ int crsql_recreate_db_version_stmt(sqlite3 *db, crsql_ExtData *pExtData); void crsql_finalize(crsql_ExtData *pExtData); void crsql_init_ordinal_map(crsql_ExtData *pExtData); void crsql_drop_ordinal_map(crsql_ExtData *pExtData); -void crsql_init_cl_cache(crsql_ExtData *pExtData); -void crsql_drop_cl_cache(crsql_ExtData *pExtData); #endif From 3e5507495e97ce0789e0bc48d2b392e5c9fb4e54 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 6 Nov 2025 23:46:18 +0100 Subject: [PATCH 4/5] handle case where clock row is missing Signed-off-by: Somtochi Onyekwere --- core/rs/core/src/lib.rs | 8 +++----- core/rs/core/src/local_writes/after_insert.rs | 15 +++++++++++---- .../integration_check/src/t/test_db_version.rs | 16 +++++++++++++++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index e0c2e4f04..368ce6b54 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -73,13 +73,11 @@ use local_writes::after_update::x_crsql_after_update; use sqlite::{Destructor, ResultCode}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; -use tableinfo::{ - crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info, -}; -use teardown::*; -use triggers::create_triggers; #[cfg(feature = "test")] use tableinfo::TableInfo; +use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info}; +use teardown::*; +use triggers::create_triggers; pub use debug::debug_log; diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index 01cac10d2..c76b5f1cb 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -53,8 +53,10 @@ fn after_insert( } else if create_record_existed { // update the create record since it already exists. let seq = bump_seq(ext_data); - let col_version = update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; - tbl_info.set_cl(key_new, col_version); + let cl = update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; + if let Some(cl) = cl { + tbl_info.set_cl(key_new, cl); + } } super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; @@ -69,7 +71,7 @@ fn update_create_record( db_version: sqlite::int64, seq: i32, ts: &str, -) -> Result { +) -> Result, String> { let update_create_record_stmt_ref = tbl_info .get_maybe_mark_locally_reinserted_stmt(db) .map_err(|_e| "failed to get update_create_record_stmt")?; @@ -97,7 +99,12 @@ fn update_create_record( let col_version = update_create_record_stmt.column_int64(0); super::reset_cached_stmt(update_create_record_stmt.stmt) .map_err(|_e| "failed to reset cached stmt")?; - Ok(col_version) + Ok(Some(col_version)) + } + Ok(ResultCode::DONE) => { + super::reset_cached_stmt(update_create_record_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + Ok(None) } _ => { super::reset_cached_stmt(update_create_record_stmt.stmt) diff --git a/core/rs/integration_check/src/t/test_db_version.rs b/core/rs/integration_check/src/t/test_db_version.rs index 9e00ae032..6eee9230d 100644 --- a/core/rs/integration_check/src/t/test_db_version.rs +++ b/core/rs/integration_check/src/t/test_db_version.rs @@ -207,6 +207,8 @@ fn test_get_or_set_pk_cl() -> Result<(), ResultCode> { insert_foo_row(&insert_foo_stmt, 2, "c")?; + insert_foo_row(&insert_foo_stmt, 4, "d")?; + let key1 = get_pk_key(&get_pk_key_stmt, 1).expect("get pk key"); let key2 = get_pk_key(&get_pk_key_stmt, 2).expect("get pk key"); @@ -228,13 +230,25 @@ fn test_get_or_set_pk_cl() -> Result<(), ResultCode> { // pk 1 and 2 should have a cl of 2 since they have been deleted assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key1)?); - assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); // reinsert pk 2, check cl is 3 insert_foo_row(&insert_foo_stmt, 2, "d")?; assert_eq!(3, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + // check insert or replace updates the cache + let insert_or_replace = db + .db + .prepare_v2("INSERT OR REPLACE INTO foo VALUES (?, ?);")?; + insert_or_replace.bind_int64(1, 4)?; + insert_or_replace.bind_text(2, "c", sqlite::Destructor::STATIC)?; + insert_or_replace.step()?; + reset_cached_stmt(&insert_or_replace)?; + + // insert of pk with no clock row gets no update + let key4 = get_pk_key(&get_pk_key_stmt, 4).expect("get pk key"); + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + db.db.exec_safe("COMMIT;")?; // commit clears the cache From 7159d4afd133fc3f49e0294b339440107638d0c6 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 7 Nov 2025 11:48:28 +0100 Subject: [PATCH 5/5] update sentinels for pk only rows too Signed-off-by: Somtochi Onyekwere --- core/rs/core/src/local_writes/after_insert.rs | 50 +++++++++---------- core/rs/core/src/local_writes/after_update.rs | 2 +- core/rs/core/src/local_writes/mod.rs | 33 +++++++----- core/rs/core/src/tableinfo.rs | 4 +- 4 files changed, 50 insertions(+), 39 deletions(-) diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index c76b5f1cb..2a4cf4e89 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -46,20 +46,27 @@ fn after_insert( let (create_record_existed, key_new) = tbl_info .get_or_create_key_for_insert(db, pks_new) .map_err(|_| "failed getting or creating lookaside key")?; - if tbl_info.non_pks.is_empty() { + + let cl = if tbl_info.non_pks.is_empty() { let seq = bump_seq(ext_data); // just a sentinel record - return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts); - } else if create_record_existed { - // update the create record since it already exists. - let seq = bump_seq(ext_data); - let cl = update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; - if let Some(cl) = cl { - tbl_info.set_cl(key_new, cl); - } - } + let cl = super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts)?; + Some(cl) + } else { + let cl = if create_record_existed { + // update the create record since it already exists. + let seq = bump_seq(ext_data); + update_create_record(db, tbl_info, key_new, db_version, seq, &ts)? + } else { + None + }; + super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; + cl + }; - super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; + if let Some(cl) = cl { + tbl_info.set_cl(key_new, cl); + } Ok(ResultCode::OK) } @@ -94,22 +101,15 @@ fn update_create_record( .map_err(|_e| "failed binding to update_create_record_stmt")?; let res = update_create_record_stmt.step(); - match res { + let result = match res { Ok(ResultCode::ROW) => { let col_version = update_create_record_stmt.column_int64(0); - super::reset_cached_stmt(update_create_record_stmt.stmt) - .map_err(|_e| "failed to reset cached stmt")?; Ok(Some(col_version)) } - Ok(ResultCode::DONE) => { - super::reset_cached_stmt(update_create_record_stmt.stmt) - .map_err(|_e| "failed to reset cached stmt")?; - Ok(None) - } - _ => { - super::reset_cached_stmt(update_create_record_stmt.stmt) - .map_err(|_e| "failed to reset cached stmt")?; - Err("failed to step update_create_record_stmt".to_string()) - } - } + Ok(ResultCode::DONE) => Ok(None), + _ => Err("failed to step update_create_record_stmt".to_string()), + }; + super::reset_cached_stmt(update_create_record_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + result } diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index a702aaee1..b75b55ef0 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -85,7 +85,7 @@ fn after_update( if crate::compare_values::any_value_changed(pks_new, pks_old)? { let old_key = tbl_info .get_or_create_key_via_raw_values(db, pks_old) - .map_err(|_| "failed geteting or creating lookaside key")?; + .map_err(|_| "failed getting or creating lookaside key")?; let next_seq = super::bump_seq(ext_data); changed = true; // Record the delete of the row identified by the old primary keys diff --git a/core/rs/core/src/local_writes/mod.rs b/core/rs/core/src/local_writes/mod.rs index c7fae9e38..f14100e9f 100644 --- a/core/rs/core/src/local_writes/mod.rs +++ b/core/rs/core/src/local_writes/mod.rs @@ -84,7 +84,7 @@ fn mark_new_pk_row_created( db_version: i64, seq: i32, ts: &str, -) -> Result { +) -> Result { let mark_locally_created_stmt_ref = tbl_info .get_mark_locally_created_stmt(db) .map_err(|_e| "failed to get mark_locally_created_stmt")?; @@ -98,7 +98,17 @@ fn mark_new_pk_row_created( .and_then(|_| mark_locally_created_stmt.bind_int(3, seq)) .and_then(|_| mark_locally_created_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to mark_locally_created_stmt")?; - step_trigger_stmt(mark_locally_created_stmt) + let stmt_res = mark_locally_created_stmt.step(); + let result = match stmt_res { + Ok(ResultCode::ROW) => { + let cl = mark_locally_created_stmt.column_int64(0); + Ok(cl) + } + _ => Err("failed to step mark locally created stmt".to_string()), + }; + reset_cached_stmt(mark_locally_created_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + result } fn bump_seq(ext_data: *mut crsql_ExtData) -> c_int { @@ -291,19 +301,18 @@ fn mark_locally_deleted( .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to mark locally deleted stmt")?; - let res = mark_locally_deleted_stmt.step(); + let stmt_res = mark_locally_deleted_stmt.step(); - match res { + let result = match stmt_res { Ok(ResultCode::ROW) => { let cl = mark_locally_deleted_stmt.column_int64(0); - reset_cached_stmt(mark_locally_deleted_stmt.stmt) - .map_err(|_e| "failed to reset cached stmt")?; Ok(cl) } - _ => { - reset_cached_stmt(mark_locally_deleted_stmt.stmt) - .map_err(|_e| "failed to reset cached stmt")?; - Err("failed to step mark locally deleted stmt".to_string()) - } - } + _ => Err("failed to step mark locally deleted stmt".to_string()), + }; + + reset_cached_stmt(mark_locally_deleted_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + + result } diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 0415ebd9b..081d54b3a 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -28,6 +28,7 @@ use sqlite_nostd::ResultCode; use sqlite_nostd::Stmt; use sqlite_nostd::StrRef; +// TODO: make this configurable with a crsql_config_set. const MAX_CL_CACHE_SIZE: usize = 1500; pub struct TableInfo { pub tbl_name: String, @@ -561,7 +562,8 @@ impl TableInfo { db_version = excluded.db_version, seq = excluded.seq, site_id = 0, - ts = excluded.ts", + ts = excluded.ts + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::INSERT_SENTINEL, );