diff --git a/nexus-common/src/db/graph/queries/get.rs b/nexus-common/src/db/graph/queries/get.rs index 8f2866dd5..3c0bbe520 100644 --- a/nexus-common/src/db/graph/queries/get.rs +++ b/nexus-common/src/db/graph/queries/get.rs @@ -90,6 +90,18 @@ pub fn get_post_bookmarks(author_id: &str, post_id: &str) -> Query { .param("post_id", post_id) } +// Read the target (post_id, author_id) for a bookmark without deleting the edge. +// Used in sync_del to read before graph-last deletion. +pub fn get_bookmark_target(user_id: &str, bookmark_id: &str) -> Query { + Query::new( + "get_bookmark_target", + "MATCH (u:User {id: $user_id})-[b:BOOKMARKED {id: $bookmark_id}]->(post:Post)<-[:AUTHORED]-(author:User) + RETURN post.id AS post_id, author.id AS author_id", + ) + .param("user_id", user_id) + .param("bookmark_id", bookmark_id) +} + // Get all the reposts that a post has received (used for edit/delete notifications) pub fn get_post_reposts(author_id: &str, post_id: &str) -> Query { Query::new( diff --git a/nexus-common/src/models/post/bookmark.rs b/nexus-common/src/models/post/bookmark.rs index dba200d04..b7d1b97fd 100644 --- a/nexus-common/src/models/post/bookmark.rs +++ b/nexus-common/src/models/post/bookmark.rs @@ -110,6 +110,24 @@ impl Bookmark { Ok(()) } + /// Reads (post_id, author_id) for a bookmark from the graph without deleting the edge. + pub async fn get_target_from_graph( + user_id: &str, + bookmark_id: &str, + ) -> GraphResult> { + let query = queries::get::get_bookmark_target(user_id, bookmark_id); + let rows = fetch_all_rows_from_graph(query).await?; + + for row in rows { + let post_id: Option = row.get("post_id").unwrap_or(None); + let author_id: Option = row.get("author_id").unwrap_or(None); + if let (Some(post_id), Some(author_id)) = (post_id, author_id) { + return Ok(Some((post_id, author_id))); + } + } + Ok(None) + } + pub async fn del_from_graph( user_id: &str, bookmark_id: &str, diff --git a/nexus-watcher/src/events/handlers/bookmark.rs b/nexus-watcher/src/events/handlers/bookmark.rs index 6b2171bb7..db2db29a1 100644 --- a/nexus-watcher/src/events/handlers/bookmark.rs +++ b/nexus-watcher/src/events/handlers/bookmark.rs @@ -59,16 +59,27 @@ pub async fn del(user_id: PubkyId, bookmark_id: String) -> Result<(), EventProce } pub async fn sync_del(user_id: PubkyId, bookmark_id: String) -> Result<(), EventProcessorError> { - let deleted_bookmark_info = Bookmark::del_from_graph(&user_id, &bookmark_id).await?; - // Ensure the bookmark exists in the graph before proceeding - let (post_id, author_id) = match deleted_bookmark_info { + // 1. Read target from graph WITHOUT deleting the edge + let (post_id, author_id) = match Bookmark::get_target_from_graph(&user_id, &bookmark_id).await? + { Some(info) => info, None => return Err(EventProcessorError::SkipIndexing), }; + // 2. Guard counter decrement: only decrement if bookmark still exists in Redis index + let existed_in_index = Bookmark::get_from_index(&author_id, &post_id, &user_id) + .await? + .is_some(); + + // 3. Redis cleanup (idempotent) Bookmark::del_from_index(&user_id, &post_id, &author_id).await?; - // Update user counts - UserCounts::decrement(&user_id, "bookmarks", None).await?; + + if existed_in_index { + UserCounts::decrement(&user_id, "bookmarks", None).await?; + } + + // 4. Graph deletion LAST — ensures data survives for retry if Redis ops fail + Bookmark::del_from_graph(&user_id, &bookmark_id).await?; Ok(()) } diff --git a/nexus-watcher/tests/event_processor/bookmarks/del_idempotent.rs b/nexus-watcher/tests/event_processor/bookmarks/del_idempotent.rs new file mode 100644 index 000000000..4e228809d --- /dev/null +++ b/nexus-watcher/tests/event_processor/bookmarks/del_idempotent.rs @@ -0,0 +1,151 @@ +use super::utils::find_post_bookmark; +use crate::event_processor::utils::watcher::WatcherTest; +use crate::event_processor::{ + users::utils::find_user_counts, utils::watcher::HomeserverHashIdPath, +}; +use anyhow::Result; +use nexus_common::models::event::EventProcessorError; +use nexus_common::models::post::Bookmark; +use nexus_common::models::user::UserCounts; +use nexus_watcher::events::handlers; +use pubky::Keypair; +use pubky_app_specs::{ + post_uri_builder, traits::HashId, PubkyAppBookmark, PubkyAppPost, PubkyAppUser, PubkyId, +}; + +/// Simulate a retry of sync_del after a partial failure where Redis cleanup +/// succeeded but graph deletion failed. On retry, the counter must NOT be +/// decremented again (guarded by the Redis index check). +#[tokio_shared_rt::test(shared)] +async fn test_bookmark_del_retry_no_double_decrement() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + // Create user + post + bookmark through the normal watcher flow + let user_kp = Keypair::random(); + let user = PubkyAppUser { + bio: Some("test_bookmark_del_retry_no_double_decrement".to_string()), + image: None, + links: None, + name: "Watcher:Bookmark:DelRetry:User".to_string(), + status: None, + }; + let user_id = test.create_user(&user_kp, &user).await?; + + let post = PubkyAppPost { + content: "Watcher:Bookmark:DelRetry:Post".to_string(), + kind: PubkyAppPost::default().kind, + parent: None, + embed: None, + attachments: None, + }; + let (post_id, post_path) = test.create_post(&user_kp, &post).await?; + + let bookmark = PubkyAppBookmark { + uri: post_uri_builder(user_id.clone(), post_id.clone()), + created_at: chrono::Utc::now().timestamp_millis(), + }; + let bookmark_id = bookmark.create_id(); + let bookmark_path = bookmark.hs_path(); + + test.put(&user_kp, &bookmark_path, bookmark).await?; + + // Verify initial state: bookmark exists in graph + Redis, count = 1 + assert!(find_post_bookmark(&user_id, &post_id, &user_id) + .await + .is_ok()); + assert!(Bookmark::get_from_index(&user_id, &post_id, &user_id) + .await? + .is_some()); + assert_eq!(find_user_counts(&user_id).await.bookmarks, 1); + + // Simulate partial completion of a previous sync_del attempt: + // Redis cleanup succeeded (index removed + counter decremented) but graph delete failed. + Bookmark::del_from_index(&user_id, &post_id, &user_id).await?; + UserCounts::decrement(&user_id, "bookmarks", None).await?; + + // Verify simulated state: graph still has edge, Redis is clean, counter = 0 + assert!(find_post_bookmark(&user_id, &post_id, &user_id) + .await + .is_ok()); + assert!(Bookmark::get_from_index(&user_id, &post_id, &user_id) + .await? + .is_none()); + assert_eq!(find_user_counts(&user_id).await.bookmarks, 0); + + // Retry: call sync_del directly — should complete graph cleanup without double-decrement + let user_pubky_id = PubkyId::try_from(user_id.as_str()).map_err(anyhow::Error::msg)?; + handlers::bookmark::sync_del(user_pubky_id, bookmark_id).await?; + + // Verify final state: graph edge deleted, counter still 0 (not decremented again) + assert!(find_post_bookmark(&user_id, &post_id, &user_id) + .await + .is_err()); + assert_eq!(find_user_counts(&user_id).await.bookmarks, 0); + + // Cleanup + test.cleanup_post(&user_kp, &post_path).await?; + test.cleanup_user(&user_kp).await?; + + Ok(()) +} + +/// After a fully successful delete, a replay of sync_del should return Ok +#[tokio_shared_rt::test(shared)] +async fn test_bookmark_del_replay_after_success_skips() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let user_kp = Keypair::random(); + let user = PubkyAppUser { + bio: Some("test_bookmark_del_replay_after_success_skips".to_string()), + image: None, + links: None, + name: "Watcher:Bookmark:DelReplay:User".to_string(), + status: None, + }; + let user_id = test.create_user(&user_kp, &user).await?; + + let post = PubkyAppPost { + content: "Watcher:Bookmark:DelReplay:Post".to_string(), + kind: PubkyAppPost::default().kind, + parent: None, + embed: None, + attachments: None, + }; + let (post_id, post_path) = test.create_post(&user_kp, &post).await?; + + let bookmark = PubkyAppBookmark { + uri: post_uri_builder(user_id.clone(), post_id.clone()), + created_at: chrono::Utc::now().timestamp_millis(), + }; + let bookmark_id = bookmark.create_id(); + let bookmark_path = bookmark.hs_path(); + + test.put(&user_kp, &bookmark_path, bookmark).await?; + + // Delete through normal event flow + test.del(&user_kp, &bookmark_path).await?; + + // Verify fully deleted state + assert!(find_post_bookmark(&user_id, &post_id, &user_id) + .await + .is_err()); + assert_eq!(find_user_counts(&user_id).await.bookmarks, 0); + + // Replay: call sync_del again — should get SkipIndexing (graph edge gone) + let user_pubky_id = PubkyId::try_from(user_id.as_str()).map_err(anyhow::Error::msg)?; + let result = handlers::bookmark::sync_del(user_pubky_id, bookmark_id).await; + + assert!( + matches!(result, Err(EventProcessorError::SkipIndexing)), + "Replay after full delete should return SkipIndexing, got: {result:?}" + ); + + // Counter must remain 0 + assert_eq!(find_user_counts(&user_id).await.bookmarks, 0); + + // Cleanup + test.cleanup_post(&user_kp, &post_path).await?; + test.cleanup_user(&user_kp).await?; + + Ok(()) +} diff --git a/nexus-watcher/tests/event_processor/bookmarks/mod.rs b/nexus-watcher/tests/event_processor/bookmarks/mod.rs index b77d94302..d1dec4651 100644 --- a/nexus-watcher/tests/event_processor/bookmarks/mod.rs +++ b/nexus-watcher/tests/event_processor/bookmarks/mod.rs @@ -1,4 +1,5 @@ mod del; +mod del_idempotent; mod fail_index; mod raw; mod retry_bookmark;