diff --git a/nexus-common/src/db/graph/queries/get.rs b/nexus-common/src/db/graph/queries/get.rs index 8f2866dd5..2acaf0c20 100644 --- a/nexus-common/src/db/graph/queries/get.rs +++ b/nexus-common/src/db/graph/queries/get.rs @@ -112,6 +112,23 @@ pub fn get_post_replies(author_id: &str, post_id: &str) -> Query { .param("post_id", post_id) } +// Read the target details for a tag without deleting the TAGGED edge. +// Used in tag del to read before graph-last deletion. +pub fn get_tag_target(user_id: &str, tag_id: &str) -> Query { + Query::new( + "get_tag_target", + "MATCH (user:User {id: $user_id})-[tag:TAGGED {id: $tag_id}]->(target) + OPTIONAL MATCH (target)<-[:AUTHORED]-(author:User) + WITH CASE WHEN target:User THEN target.id ELSE null END AS user_id, + CASE WHEN target:Post THEN target.id ELSE null END AS post_id, + CASE WHEN target:Post THEN author.id ELSE null END AS author_id, + tag.label AS label + RETURN user_id, post_id, author_id, label", + ) + .param("user_id", user_id) + .param("tag_id", tag_id) +} + // Get all the tags/taggers that a post has received (used for edit/delete notifications) pub fn get_post_tags(author_id: &str, post_id: &str) -> Query { Query::new( diff --git a/nexus-common/src/models/tag/traits/collection.rs b/nexus-common/src/models/tag/traits/collection.rs index 7d1b75612..83860fd86 100644 --- a/nexus-common/src/models/tag/traits/collection.rs +++ b/nexus-common/src/models/tag/traits/collection.rs @@ -401,6 +401,26 @@ where Ok(Some((user_id, post_id, author_id, label))) } + /// Reads tag target details from the graph without deleting the TAGGED edge. + /// Same return shape as `del_from_graph`. + async fn get_target_from_graph( + user_id: &str, + tag_id: &str, + ) -> GraphResult, Option, Option, String)>> { + let query = queries::get::get_tag_target(user_id, tag_id); + let maybe_row = fetch_row_from_graph(query).await?; + + let Some(row) = maybe_row else { + return Ok(None); + }; + + let user_id: Option = row.get("user_id").unwrap_or(None); + let author_id: Option = row.get("author_id").unwrap_or(None); + let post_id: Option = row.get("post_id").unwrap_or(None); + let label: String = row.get("label").expect("Query should return tag label"); + Ok(Some((user_id, post_id, author_id, label))) + } + /// Returns the unique key parts used to identify a tag in the Redis database fn get_tag_prefix<'a>() -> [&'a str; 2]; diff --git a/nexus-watcher/src/events/handlers/tag.rs b/nexus-watcher/src/events/handlers/tag.rs index ff455f247..1e5d0c483 100644 --- a/nexus-watcher/src/events/handlers/tag.rs +++ b/nexus-watcher/src/events/handlers/tag.rs @@ -3,7 +3,7 @@ use crate::events::EventProcessorError; use chrono::Utc; use nexus_common::db::kv::ScoreAction; -use nexus_common::db::OperationOutcome; +use nexus_common::db::{OperationOutcome, RedisOps}; use nexus_common::models::homeserver::Homeserver; use nexus_common::models::notification::Notification; use nexus_common::models::post::search::PostsByTagSearch; @@ -78,7 +78,20 @@ async fn put_sync_post( ) .await? { - OperationOutcome::Updated => Ok(()), + OperationOutcome::Updated => { + // Re-run idempotent ops to recover from partial failure (graph wrote, Redis didn't) + let tag_label_slice = &[tag_label.to_string()]; + let idempotent_results = nexus_common::traced_join!( + tracing::info_span!("index.write", phase = "tag_post_retry"); + TagPost::add_tagger_to_index(&author_id, Some(post_id), &tagger_user_id, tag_label), + PostsByTagSearch::put_to_index(&author_id, post_id, tag_label), + TagSearch::put_to_index(tag_label_slice) + ); + idempotent_results.0?; + idempotent_results.1?; + idempotent_results.2?; + Ok(()) + } OperationOutcome::MissingDependency => { // Ensure that dependencies follow the same format as the RetryManager keys let dependency = vec![format!("{author_id}:posts:{post_id}")]; @@ -186,7 +199,18 @@ async fn put_sync_user( ) .await? { - OperationOutcome::Updated => Ok(()), + OperationOutcome::Updated => { + // Re-run idempotent ops to recover from partial failure (graph wrote, Redis didn't) + let tag_label_slice = &[tag_label.to_string()]; + let idempotent_results = nexus_common::traced_join!( + tracing::info_span!("index.write", phase = "tag_user_retry"); + TagUser::add_tagger_to_index(&tagged_user_id, None, &tagger_user_id, tag_label), + TagSearch::put_to_index(tag_label_slice) + ); + idempotent_results.0?; + idempotent_results.1?; + Ok(()) + } OperationOutcome::MissingDependency => { if let Err(e) = Homeserver::maybe_ingest_for_user(tagged_user_id.as_str()).await { tracing::error!("Failed to ingest homeserver: {e}"); @@ -238,26 +262,48 @@ async fn put_sync_user( #[tracing::instrument(name = "tag.del", skip_all, fields(user_id = %user_id, tag_id = %tag_id))] pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), EventProcessorError> { debug!("Deleting tag: {} -> {}", user_id, tag_id); - let tag_details = TagUser::del_from_graph(&user_id, &tag_id).await?; - // CHOOSE THE EVENT TYPE + + // 1. Read target from graph WITHOUT deleting the edge + let tag_details = TagUser::get_target_from_graph(&user_id, &tag_id).await?; + if let Some((tagged_user_id, post_id, author_id, label)) = tag_details { match (tagged_user_id, post_id, author_id) { // Delete user related indexes (Some(tagged_id), None, None) => { - del_sync_user(user_id, &tagged_id, &label).await?; + // Guard: check if tagger is still in Redis set (not yet cleaned on a prior attempt) + let tagger_in_index = + TagUser::check_set_member(&[tagged_id.as_str(), label.as_str()], &user_id) + .await? + .1; + del_sync_user(user_id.clone(), &tagged_id, &label, tagger_in_index).await?; } // Delete post related indexes (None, Some(post_id), Some(author_id)) => { - del_sync_post(user_id, &post_id, &author_id, &label).await?; + let tagger_in_index = TagPost::check_set_member( + &[author_id.as_str(), post_id.as_str(), label.as_str()], + &user_id, + ) + .await? + .1; + del_sync_post( + user_id.clone(), + &post_id, + &author_id, + &label, + tagger_in_index, + ) + .await?; } // Handle other unexpected cases _ => { debug!("DEL-Tag: Unexpected combination of tag details"); } } - } else { - return Err(EventProcessorError::SkipIndexing); + + // 3. Graph deletion LAST — ensures data survives for retry if Redis ops fail + TagUser::del_from_graph(&user_id, &tag_id).await?; } + // Edge already gone (fully completed on a prior attempt) — idempotent no-op Ok(()) } @@ -265,30 +311,48 @@ async fn del_sync_user( tagger_id: PubkyId, tagged_id: &str, tag_label: &str, + tagger_in_index: bool, ) -> Result<(), EventProcessorError> { let indexing_results = nexus_common::traced_join!( tracing::info_span!("index.delete", phase = "tag_user"); - // Update user counts in the tagged - UserCounts::decrement(tagged_id, "tags", None), - // Update user counts in the tagger - UserCounts::decrement(&tagger_id, "tagged", None), + // Guarded: Update user counts in the tagged async { - // Decrement label count to the user profile tag - TagUser::update_index_score(tagged_id, None, tag_label, ScoreAction::Decrement(1.0)).await?; - // Decrease unique_tags - // NOTE: To update that field, we first need to decrement the value in the TagUser SORTED SET associated with that tag - UserCounts::decrement(tagged_id, "unique_tags", Some(tag_label)).await?; + if tagger_in_index { + UserCounts::decrement(tagged_id, "tags", None).await?; + } Ok::<(), EventProcessorError>(()) }, + // Guarded: Update user counts in the tagger async { - // Remove tagger to the user taggers list + if tagger_in_index { + UserCounts::decrement(&tagger_id, "tagged", None).await?; + } + Ok::<(), EventProcessorError>(()) + }, + async { + if tagger_in_index { + // Decrement label count to the user profile tag + TagUser::update_index_score(tagged_id, None, tag_label, ScoreAction::Decrement(1.0)).await?; + // Decrease unique_tags + // NOTE: To update that field, we first need to decrement the value in the TagUser SORTED SET associated with that tag + UserCounts::decrement(tagged_id, "unique_tags", Some(tag_label)).await?; + } + Ok::<(), EventProcessorError>(()) + }, + async { + // Idempotent: Remove tagger from the user taggers list (SREM) TagUser(vec![tagger_id.to_string()]) .del_from_index(tagged_id, None, tag_label) .await?; Ok::<(), EventProcessorError>(()) }, - // Save new notification - Notification::new_user_untag(&tagger_id, tagged_id, tag_label) + // Guarded: notification + async { + if tagger_in_index { + Notification::new_user_untag(&tagger_id, tagged_id, tag_label).await?; + } + Ok::<(), EventProcessorError>(()) + } ); indexing_results.0?; @@ -305,56 +369,76 @@ async fn del_sync_post( post_id: &str, author_id: &str, tag_label: &str, + tagger_in_index: bool, ) -> Result<(), EventProcessorError> { - // SAVE TO INDEXES let post_key_slice: &[&str] = &[author_id, post_id]; let tag_post = TagPost(vec![tagger_id.to_string()]); let post_uri = post_uri_builder(author_id.to_string(), post_id.to_string()); let indexing_results = nexus_common::traced_join!( tracing::info_span!("index.delete", phase = "tag_post"); - // Update user counts for tagger - UserCounts::decrement(&tagger_id, "tagged", None), - // Decrement in one the post tags - PostCounts::decrement_index_field(post_key_slice, "tags", None), + // Guarded: Update user counts for tagger async { - // Decrement label score in the post - TagPost::update_index_score( - author_id, - Some(post_id), - tag_label, - ScoreAction::Decrement(1.0), - ) - .await?; - // Decrease unique_tag - // NOTE: To update that field, we first need to decrement the value in the SORTED SET associated with that tag - PostCounts::decrement_index_field(post_key_slice, "unique_tags", Some(tag_label)) + if tagger_in_index { + UserCounts::decrement(&tagger_id, "tagged", None).await?; + } + Ok::<(), EventProcessorError>(()) + }, + // Guarded: Decrement in one the post tags + async { + if tagger_in_index { + PostCounts::decrement_index_field(post_key_slice, "tags", None).await?; + } + Ok::<(), EventProcessorError>(()) + }, + async { + if tagger_in_index { + // Decrement label score in the post + TagPost::update_index_score( + author_id, + Some(post_id), + tag_label, + ScoreAction::Decrement(1.0), + ) + .await?; + // Decrease unique_tag + // NOTE: To update that field, we first need to decrement the value in the SORTED SET associated with that tag + PostCounts::decrement_index_field(post_key_slice, "unique_tags", Some(tag_label)) + .await?; + } + Ok::<(), EventProcessorError>(()) + }, + // Guarded: Decrease post from label total engagement + async { + if tagger_in_index { + PostsByTagSearch::update_index_score( + author_id, + post_id, + tag_label, + ScoreAction::Decrement(1.0), + ) .await?; + } Ok::<(), EventProcessorError>(()) }, - // Decrease post from label total engagement - PostsByTagSearch::update_index_score( - author_id, - post_id, - tag_label, - ScoreAction::Decrement(1.0), - ), async { - // Post replies cannot be included in the total engagement index once the tag have been deleted - if !post_relationships_is_reply(author_id, post_id).await? { - // Decrement in one post global engagement - PostStream::update_index_score(author_id, post_id, ScoreAction::Decrement(1.0)) - .await - .map_err(EventProcessorError::index_operation_failed)?; + if tagger_in_index { + // Post replies cannot be included in the total engagement index once the tag have been deleted + if !post_relationships_is_reply(author_id, post_id).await? { + // Decrement in one post global engagement + PostStream::update_index_score(author_id, post_id, ScoreAction::Decrement(1.0)) + .await + .map_err(EventProcessorError::index_operation_failed)?; + } } Ok::<(), EventProcessorError>(()) }, async { - // Delete the tagger from the tag list + // Idempotent: Delete the tagger from the tag list (SREM) tag_post .del_from_index(author_id, Some(post_id), tag_label) .await?; - // NOTE: The tag search index, depends on the post taggers collection to delete + // NOTE: The tag search index depends on the post taggers collection to delete // Delete post from global label timeline PostsByTagSearch::del_from_index(author_id, post_id, tag_label).await?; @@ -368,8 +452,13 @@ async fn del_sync_post( Ok::<(), EventProcessorError>(()) }, - // Save new notification - Notification::new_post_untag(&tagger_id, author_id, tag_label, &post_uri) + // Guarded: notification + async { + if tagger_in_index { + Notification::new_post_untag(&tagger_id, author_id, tag_label, &post_uri).await?; + } + Ok::<(), EventProcessorError>(()) + } ); indexing_results.0?; diff --git a/nexus-watcher/tests/event_processor/tags/del_idempotent.rs b/nexus-watcher/tests/event_processor/tags/del_idempotent.rs new file mode 100644 index 000000000..9bebd2631 --- /dev/null +++ b/nexus-watcher/tests/event_processor/tags/del_idempotent.rs @@ -0,0 +1,178 @@ +use super::utils::find_post_tag; +use crate::event_processor::posts::utils::find_post_counts; +use crate::event_processor::users::utils::find_user_counts; +use crate::event_processor::utils::watcher::WatcherTest; +use anyhow::Result; +use nexus_common::db::OperationOutcome; +use nexus_common::models::post::PostCounts; +use nexus_common::models::tag::post::TagPost; +use nexus_common::models::tag::traits::{TagCollection, TaggersCollection}; +use nexus_common::models::user::UserCounts; +use nexus_watcher::events::handlers; +use pubky::Keypair; +use pubky_app_specs::{PubkyAppPost, PubkyAppUser, PubkyId}; + +/// Use indexed_at far in the past so TAGGED edges don't interfere with +/// hot-tags tests that query this_month/today timeframes. +const OLD_INDEXED_AT: i64 = 1_000_000; + +/// Simulate a retry of tag del after a partial failure where Redis cleanup +/// succeeded but graph deletion failed. On retry, counters must NOT be +/// decremented again (guarded by the tagger set membership check). +#[tokio_shared_rt::test(shared)] +async fn test_tag_post_del_retry_no_double_decrement() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + // Create users + post through the watcher (no TAGGED edges) + let tagger_kp = Keypair::random(); + let tagger = PubkyAppUser { + bio: Some("test_tag_post_del_retry_no_double_decrement".to_string()), + image: None, + links: None, + name: "Watcher:TagDelRetry:Tagger".to_string(), + status: None, + }; + let tagger_id = test.create_user(&tagger_kp, &tagger).await?; + + let author_kp = Keypair::random(); + let author = PubkyAppUser { + bio: Some("test_tag_post_del_retry_no_double_decrement".to_string()), + image: None, + links: None, + name: "Watcher:TagDelRetry:Author".to_string(), + status: None, + }; + let author_id = test.create_user(&author_kp, &author).await?; + + let post = PubkyAppPost { + content: "Watcher:TagDelRetry:Post".to_string(), + kind: PubkyAppPost::default().kind, + parent: None, + embed: None, + attachments: None, + }; + let (post_id, post_path) = test.create_post(&author_kp, &post).await?; + + // Set up tag state directly via model methods (old timestamp avoids hot-tags interference) + let label = "retry_idem_test"; + let tag_id = "TEST_TAG_RETRY_01"; + let outcome = TagPost::put_to_graph( + &tagger_id, + &author_id, + Some(&post_id), + tag_id, + label, + OLD_INDEXED_AT, + ) + .await?; + assert!(matches!(outcome, OperationOutcome::CreatedOrDeleted)); + + // Populate Redis state: tagger set + counters + TagPost::add_tagger_to_index(&author_id, Some(&post_id), &tagger_id, label).await?; + UserCounts::increment(&tagger_id, "tagged", None).await?; + PostCounts::increment_index_field(&[&author_id, &post_id], "tags", None).await?; + + // Verify initial state + assert_eq!(find_user_counts(&tagger_id).await.tagged, 1); + assert_eq!(find_post_counts(&author_id, &post_id).await.tags, 1); + + // Simulate partial completion of a previous del attempt: + // Remove tagger from Redis set + decrement counters (as if Redis ops completed but graph delete failed) + let tag_post = TagPost(vec![tagger_id.clone()]); + tag_post + .del_from_index(&author_id, Some(&post_id), label) + .await?; + UserCounts::decrement(&tagger_id, "tagged", None).await?; + PostCounts::decrement_index_field(&[&author_id, &post_id], "tags", None).await?; + + // Verify simulated state: graph still has TAGGED edge, Redis tagger set empty, counters at 0 + let post_tag = find_post_tag(&author_id, &post_id, label).await?; + assert!(post_tag.is_some(), "Graph should still have the tag edge"); + assert_eq!(find_user_counts(&tagger_id).await.tagged, 0); + assert_eq!(find_post_counts(&author_id, &post_id).await.tags, 0); + + // Retry: call del handler directly — should delete graph without double-decrement + let tagger_pubky_id = PubkyId::try_from(tagger_id.as_str()).map_err(anyhow::Error::msg)?; + handlers::tag::del(tagger_pubky_id, tag_id.to_string()).await?; + + // Verify final state: graph edge deleted, counters still 0 + let post_tag = find_post_tag(&author_id, &post_id, label).await?; + assert!(post_tag.is_none(), "Graph tag edge should be deleted"); + assert_eq!(find_user_counts(&tagger_id).await.tagged, 0); + assert_eq!(find_post_counts(&author_id, &post_id).await.tags, 0); + + // Cleanup + test.cleanup_post(&author_kp, &post_path).await?; + test.cleanup_user(&tagger_kp).await?; + test.cleanup_user(&author_kp).await?; + + Ok(()) +} + +/// After a fully successful delete, a replay of tag del should return Ok +#[tokio_shared_rt::test(shared)] +async fn test_tag_post_del_replay_after_success_skips() -> Result<()> { + let mut test = WatcherTest::setup().await?; + + let tagger_kp = Keypair::random(); + let tagger = PubkyAppUser { + bio: Some("test_tag_post_del_replay_after_success_skips".to_string()), + image: None, + links: None, + name: "Watcher:TagDelReplay:Tagger".to_string(), + status: None, + }; + let tagger_id = test.create_user(&tagger_kp, &tagger).await?; + + let author_kp = Keypair::random(); + let author = PubkyAppUser { + bio: Some("test_tag_post_del_replay_after_success_skips".to_string()), + image: None, + links: None, + name: "Watcher:TagDelReplay:Author".to_string(), + status: None, + }; + let author_id = test.create_user(&author_kp, &author).await?; + + let post = PubkyAppPost { + content: "Watcher:TagDelReplay:Post".to_string(), + kind: PubkyAppPost::default().kind, + parent: None, + embed: None, + attachments: None, + }; + let (post_id, post_path) = test.create_post(&author_kp, &post).await?; + + // Set up tag directly via model, then delete via handler + let label = "replay_skip_test"; + let tag_id = "TEST_TAG_REPLAY_01"; + TagPost::put_to_graph( + &tagger_id, + &author_id, + Some(&post_id), + tag_id, + label, + OLD_INDEXED_AT, + ) + .await?; + TagPost::add_tagger_to_index(&author_id, Some(&post_id), &tagger_id, label).await?; + UserCounts::increment(&tagger_id, "tagged", None).await?; + + // First delete via handler (should succeed) + let tagger_pubky_id = PubkyId::try_from(tagger_id.as_str()).map_err(anyhow::Error::msg)?; + handlers::tag::del(tagger_pubky_id.clone(), tag_id.to_string()).await?; + + // Verify fully deleted state + let post_tag = find_post_tag(&author_id, &post_id, label).await?; + assert!(post_tag.is_none()); + + // Replay: call del again — should succeed as idempotent no-op + handlers::tag::del(tagger_pubky_id, tag_id.to_string()).await?; + + // Cleanup + test.cleanup_post(&author_kp, &post_path).await?; + test.cleanup_user(&tagger_kp).await?; + test.cleanup_user(&author_kp).await?; + + Ok(()) +} diff --git a/nexus-watcher/tests/event_processor/tags/mod.rs b/nexus-watcher/tests/event_processor/tags/mod.rs index 258e2bb5f..be1e75bb6 100644 --- a/nexus-watcher/tests/event_processor/tags/mod.rs +++ b/nexus-watcher/tests/event_processor/tags/mod.rs @@ -1,3 +1,4 @@ +mod del_idempotent; mod fail_index; mod multi_user; mod post_del; diff --git a/nexus-watcher/tests/event_processor/tags/retry_post_tag.rs b/nexus-watcher/tests/event_processor/tags/retry_post_tag.rs index 86adf46c0..38987a0d8 100644 --- a/nexus-watcher/tests/event_processor/tags/retry_post_tag.rs +++ b/nexus-watcher/tests/event_processor/tags/retry_post_tag.rs @@ -83,27 +83,5 @@ async fn test_homeserver_post_tag_event_to_queue() -> Result<()> { test.del(&tagger_kp, &tag_path).await?; - let del_index_key = format!( - "{}:{}", - EventType::Del, - RetryEvent::generate_index_key(&tag_absolute_url).unwrap() - ); - - assert_eventually_exists(&del_index_key).await; - - let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); - assert!(timestamp.is_some()); - - let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); - assert!(event_retry.is_some()); - - let event_state = event_retry.unwrap(); - assert_eq!(event_state.retry_count, 0); - - match event_state.error_type { - EventProcessorError::SkipIndexing => (), - _ => panic!("The error type has to be SkipIndexing type"), - }; - Ok(()) } diff --git a/nexus-watcher/tests/event_processor/tags/retry_user_tag.rs b/nexus-watcher/tests/event_processor/tags/retry_user_tag.rs index 858c8a720..fc5d28611 100644 --- a/nexus-watcher/tests/event_processor/tags/retry_user_tag.rs +++ b/nexus-watcher/tests/event_processor/tags/retry_user_tag.rs @@ -76,27 +76,5 @@ async fn test_homeserver_user_tag_event_to_queue() -> Result<()> { test.del(&tagger_kp, &tag_path).await?; - let del_index_key = format!( - "{}:{}", - EventType::Del, - RetryEvent::generate_index_key(&tag_absolute_url).unwrap() - ); - - assert_eventually_exists(&del_index_key).await; - - let timestamp = RetryEvent::check_uri(&del_index_key).await.unwrap(); - assert!(timestamp.is_some()); - - let event_retry = RetryEvent::get_from_index(&del_index_key).await.unwrap(); - assert!(event_retry.is_some()); - - let event_state = event_retry.unwrap(); - assert_eq!(event_state.retry_count, 0); - - match event_state.error_type { - EventProcessorError::SkipIndexing => (), - _ => panic!("The error type has to be SkipIndexing type"), - }; - Ok(()) }