Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions nexus-common/src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ pub fn get_post_bookmarks(author_id: &str, post_id: &str) -> Query {
.param("post_id", post_id)
}

// Read the target (post_id, author_id) for a bookmark without deleting the edge.
// Used in sync_del to read before graph-last deletion.
pub fn get_bookmark_target(user_id: &str, bookmark_id: &str) -> Query {
Query::new(
"get_bookmark_target",
"MATCH (u:User {id: $user_id})-[b:BOOKMARKED {id: $bookmark_id}]->(post:Post)<-[:AUTHORED]-(author:User)
RETURN post.id AS post_id, author.id AS author_id",
)
.param("user_id", user_id)
.param("bookmark_id", bookmark_id)
}

// Get all the reposts that a post has received (used for edit/delete notifications)
pub fn get_post_reposts(author_id: &str, post_id: &str) -> Query {
Query::new(
Expand Down
18 changes: 18 additions & 0 deletions nexus-common/src/models/post/bookmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ impl Bookmark {
Ok(())
}

/// Reads (post_id, author_id) for a bookmark from the graph without deleting the edge.
pub async fn get_target_from_graph(
user_id: &str,
bookmark_id: &str,
) -> GraphResult<Option<(String, String)>> {
let query = queries::get::get_bookmark_target(user_id, bookmark_id);
let rows = fetch_all_rows_from_graph(query).await?;

for row in rows {
let post_id: Option<String> = row.get("post_id").unwrap_or(None);
let author_id: Option<String> = row.get("author_id").unwrap_or(None);
if let (Some(post_id), Some(author_id)) = (post_id, author_id) {
return Ok(Some((post_id, author_id)));
}
}
Ok(None)
}

pub async fn del_from_graph(
user_id: &str,
bookmark_id: &str,
Expand Down
21 changes: 16 additions & 5 deletions nexus-watcher/src/events/handlers/bookmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,27 @@ pub async fn del(user_id: PubkyId, bookmark_id: String) -> Result<(), EventProce
}

pub async fn sync_del(user_id: PubkyId, bookmark_id: String) -> Result<(), EventProcessorError> {
let deleted_bookmark_info = Bookmark::del_from_graph(&user_id, &bookmark_id).await?;
// Ensure the bookmark exists in the graph before proceeding
let (post_id, author_id) = match deleted_bookmark_info {
// 1. Read target from graph WITHOUT deleting the edge
let (post_id, author_id) = match Bookmark::get_target_from_graph(&user_id, &bookmark_id).await?
{
Some(info) => info,
None => return Err(EventProcessorError::SkipIndexing),
};
Comment on lines +63 to 67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let (post_id, author_id) = match Bookmark::get_target_from_graph(&user_id, &bookmark_id).await?
{
Some(info) => info,
None => return Err(EventProcessorError::SkipIndexing),
};
let (post_id, author_id) = Bookmark::get_target_from_graph(&user_id, &bookmark_id)
.await?
.ok_or(EventProcessorError::SkipIndexing)?;


// 2. Guard counter decrement: only decrement if bookmark still exists in Redis index
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guard is designed to prevent double-decrement on retry, it it only handles the case where both the Redis index removal and the counter decrement succeed before the graph delete failed.

If del_from_index (step 3) succeeds, but the process crashes before UserCounts::decrement runs, then on retry existed_in_index will be false and the counter will never be decremented.

let existed_in_index = Bookmark::get_from_index(&author_id, &post_id, &user_id)
.await?
.is_some();

// 3. Redis cleanup (idempotent)
Bookmark::del_from_index(&user_id, &post_id, &author_id).await?;
// Update user counts
UserCounts::decrement(&user_id, "bookmarks", None).await?;

if existed_in_index {
UserCounts::decrement(&user_id, "bookmarks", None).await?;
}

// 4. Graph deletion LAST — ensures data survives for retry if Redis ops fail
Bookmark::del_from_graph(&user_id, &bookmark_id).await?;

Ok(())
}
151 changes: 151 additions & 0 deletions nexus-watcher/tests/event_processor/bookmarks/del_idempotent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use super::utils::find_post_bookmark;
use crate::event_processor::utils::watcher::WatcherTest;
use crate::event_processor::{
users::utils::find_user_counts, utils::watcher::HomeserverHashIdPath,
};
use anyhow::Result;
use nexus_common::models::event::EventProcessorError;
use nexus_common::models::post::Bookmark;
use nexus_common::models::user::UserCounts;
use nexus_watcher::events::handlers;
use pubky::Keypair;
use pubky_app_specs::{
post_uri_builder, traits::HashId, PubkyAppBookmark, PubkyAppPost, PubkyAppUser, PubkyId,
};

/// Simulate a retry of sync_del after a partial failure where Redis cleanup
/// succeeded but graph deletion failed. On retry, the counter must NOT be
/// decremented again (guarded by the Redis index check).
#[tokio_shared_rt::test(shared)]
async fn test_bookmark_del_retry_no_double_decrement() -> Result<()> {
let mut test = WatcherTest::setup().await?;

// Create user + post + bookmark through the normal watcher flow
let user_kp = Keypair::random();
let user = PubkyAppUser {
bio: Some("test_bookmark_del_retry_no_double_decrement".to_string()),
image: None,
links: None,
name: "Watcher:Bookmark:DelRetry:User".to_string(),
status: None,
};
let user_id = test.create_user(&user_kp, &user).await?;

let post = PubkyAppPost {
content: "Watcher:Bookmark:DelRetry:Post".to_string(),
kind: PubkyAppPost::default().kind,
parent: None,
embed: None,
attachments: None,
};
let (post_id, post_path) = test.create_post(&user_kp, &post).await?;

let bookmark = PubkyAppBookmark {
uri: post_uri_builder(user_id.clone(), post_id.clone()),
created_at: chrono::Utc::now().timestamp_millis(),
};
let bookmark_id = bookmark.create_id();
let bookmark_path = bookmark.hs_path();

test.put(&user_kp, &bookmark_path, bookmark).await?;

// Verify initial state: bookmark exists in graph + Redis, count = 1
assert!(find_post_bookmark(&user_id, &post_id, &user_id)
.await
.is_ok());
assert!(Bookmark::get_from_index(&user_id, &post_id, &user_id)
.await?
.is_some());
assert_eq!(find_user_counts(&user_id).await.bookmarks, 1);

// Simulate partial completion of a previous sync_del attempt:
// Redis cleanup succeeded (index removed + counter decremented) but graph delete failed.
Bookmark::del_from_index(&user_id, &post_id, &user_id).await?;
UserCounts::decrement(&user_id, "bookmarks", None).await?;

// Verify simulated state: graph still has edge, Redis is clean, counter = 0
assert!(find_post_bookmark(&user_id, &post_id, &user_id)
.await
.is_ok());
assert!(Bookmark::get_from_index(&user_id, &post_id, &user_id)
.await?
.is_none());
assert_eq!(find_user_counts(&user_id).await.bookmarks, 0);

// Retry: call sync_del directly — should complete graph cleanup without double-decrement
let user_pubky_id = PubkyId::try_from(user_id.as_str()).map_err(anyhow::Error::msg)?;
handlers::bookmark::sync_del(user_pubky_id, bookmark_id).await?;

// Verify final state: graph edge deleted, counter still 0 (not decremented again)
assert!(find_post_bookmark(&user_id, &post_id, &user_id)
.await
.is_err());
assert_eq!(find_user_counts(&user_id).await.bookmarks, 0);

// Cleanup
test.cleanup_post(&user_kp, &post_path).await?;
test.cleanup_user(&user_kp).await?;

Ok(())
}

/// After a fully successful delete, a replay of sync_del should return Ok
#[tokio_shared_rt::test(shared)]
async fn test_bookmark_del_replay_after_success_skips() -> Result<()> {
let mut test = WatcherTest::setup().await?;

let user_kp = Keypair::random();
let user = PubkyAppUser {
bio: Some("test_bookmark_del_replay_after_success_skips".to_string()),
image: None,
links: None,
name: "Watcher:Bookmark:DelReplay:User".to_string(),
status: None,
};
let user_id = test.create_user(&user_kp, &user).await?;

let post = PubkyAppPost {
content: "Watcher:Bookmark:DelReplay:Post".to_string(),
kind: PubkyAppPost::default().kind,
parent: None,
embed: None,
attachments: None,
};
let (post_id, post_path) = test.create_post(&user_kp, &post).await?;

let bookmark = PubkyAppBookmark {
uri: post_uri_builder(user_id.clone(), post_id.clone()),
created_at: chrono::Utc::now().timestamp_millis(),
};
let bookmark_id = bookmark.create_id();
let bookmark_path = bookmark.hs_path();

test.put(&user_kp, &bookmark_path, bookmark).await?;

// Delete through normal event flow
test.del(&user_kp, &bookmark_path).await?;

// Verify fully deleted state
assert!(find_post_bookmark(&user_id, &post_id, &user_id)
Copy link
Copy Markdown
Contributor

@ok300 ok300 Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test conflates the post author and the bookmarker, which means it doesn't cover the case when post author != bookmarker, which is a more realistic scenario. It would be better if the test creates 2 separate users, author_id and bookmarker_id.

.await
.is_err());
assert_eq!(find_user_counts(&user_id).await.bookmarks, 0);

// Replay: call sync_del again — should get SkipIndexing (graph edge gone)
let user_pubky_id = PubkyId::try_from(user_id.as_str()).map_err(anyhow::Error::msg)?;
let result = handlers::bookmark::sync_del(user_pubky_id, bookmark_id).await;

assert!(
matches!(result, Err(EventProcessorError::SkipIndexing)),
"Replay after full delete should return SkipIndexing, got: {result:?}"
);

// Counter must remain 0
assert_eq!(find_user_counts(&user_id).await.bookmarks, 0);

// Cleanup
test.cleanup_post(&user_kp, &post_path).await?;
test.cleanup_user(&user_kp).await?;

Ok(())
}
1 change: 1 addition & 0 deletions nexus-watcher/tests/event_processor/bookmarks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod del;
mod del_idempotent;
mod fail_index;
mod raw;
mod retry_bookmark;
Expand Down
Loading