Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions nexus-common/src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ pub fn get_post_replies(author_id: &str, post_id: &str) -> Query {
.param("post_id", post_id)
}

// Read the target details for a tag without deleting the TAGGED edge.
// Used in tag del to read before graph-last deletion.
pub fn get_tag_target(user_id: &str, tag_id: &str) -> Query {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost identical (1 line diff) to delete_tag in del.rs. This means any future logic change in the query has to reflect equally across both methods.

Maybe there's a way to avoid query duplication? Like have both methods internally use the same "query builder fn" with one boolean flag include_delete_call?

Alternatively, both methods should have a comment pointing at each other, saying "query logic must be kept in-sync with (other fn)". Although this is more error prone than the 1st suggestion.

Query::new(
"get_tag_target",
"MATCH (user:User {id: $user_id})-[tag:TAGGED {id: $tag_id}]->(target)
OPTIONAL MATCH (target)<-[:AUTHORED]-(author:User)
WITH CASE WHEN target:User THEN target.id ELSE null END AS user_id,
CASE WHEN target:Post THEN target.id ELSE null END AS post_id,
CASE WHEN target:Post THEN author.id ELSE null END AS author_id,
tag.label AS label
RETURN user_id, post_id, author_id, label",
)
.param("user_id", user_id)
.param("tag_id", tag_id)
}

// Get all the tags/taggers that a post has received (used for edit/delete notifications)
pub fn get_post_tags(author_id: &str, post_id: &str) -> Query {
Query::new(
Expand Down
20 changes: 20 additions & 0 deletions nexus-common/src/models/tag/traits/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,26 @@ where
Ok(Some((user_id, post_id, author_id, label)))
}

/// Reads tag target details from the graph without deleting the TAGGED edge.
/// Same return shape as `del_from_graph`.
async fn get_target_from_graph(
user_id: &str,
tag_id: &str,
) -> GraphResult<Option<(Option<String>, Option<String>, Option<String>, String)>> {
let query = queries::get::get_tag_target(user_id, tag_id);
let maybe_row = fetch_row_from_graph(query).await?;

let Some(row) = maybe_row else {
return Ok(None);
};

let user_id: Option<String> = row.get("user_id").unwrap_or(None);
let author_id: Option<String> = row.get("author_id").unwrap_or(None);
let post_id: Option<String> = row.get("post_id").unwrap_or(None);
let label: String = row.get("label").expect("Query should return tag label");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this shouldn't panic, but instead throw a GraphError?

Ok(Some((user_id, post_id, author_id, label)))
}

/// Returns the unique key parts used to identify a tag in the Redis database
fn get_tag_prefix<'a>() -> [&'a str; 2];

Expand Down
197 changes: 143 additions & 54 deletions nexus-watcher/src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::events::EventProcessorError;

use chrono::Utc;
use nexus_common::db::kv::ScoreAction;
use nexus_common::db::OperationOutcome;
use nexus_common::db::{OperationOutcome, RedisOps};
use nexus_common::models::homeserver::Homeserver;
use nexus_common::models::notification::Notification;
use nexus_common::models::post::search::PostsByTagSearch;
Expand Down Expand Up @@ -78,7 +78,20 @@ async fn put_sync_post(
)
.await?
{
OperationOutcome::Updated => Ok(()),
OperationOutcome::Updated => {
// Re-run idempotent ops to recover from partial failure (graph wrote, Redis didn't)
let tag_label_slice = &[tag_label.to_string()];
let idempotent_results = nexus_common::traced_join!(
tracing::info_span!("index.write", phase = "tag_post_retry");
TagPost::add_tagger_to_index(&author_id, Some(post_id), &tagger_user_id, tag_label),
PostsByTagSearch::put_to_index(&author_id, post_id, tag_label),
TagSearch::put_to_index(tag_label_slice)
);
idempotent_results.0?;
idempotent_results.1?;
idempotent_results.2?;
Ok(())
}
OperationOutcome::MissingDependency => {
// Ensure that dependencies follow the same format as the RetryManager keys
let dependency = vec![format!("{author_id}:posts:{post_id}")];
Expand Down Expand Up @@ -186,7 +199,18 @@ async fn put_sync_user(
)
.await?
{
OperationOutcome::Updated => Ok(()),
OperationOutcome::Updated => {
// Re-run idempotent ops to recover from partial failure (graph wrote, Redis didn't)
let tag_label_slice = &[tag_label.to_string()];
let idempotent_results = nexus_common::traced_join!(
tracing::info_span!("index.write", phase = "tag_user_retry");
TagUser::add_tagger_to_index(&tagged_user_id, None, &tagger_user_id, tag_label),
TagSearch::put_to_index(tag_label_slice)
);
idempotent_results.0?;
idempotent_results.1?;
Ok(())
}
OperationOutcome::MissingDependency => {
if let Err(e) = Homeserver::maybe_ingest_for_user(tagged_user_id.as_str()).await {
tracing::error!("Failed to ingest homeserver: {e}");
Expand Down Expand Up @@ -238,57 +262,97 @@ async fn put_sync_user(
#[tracing::instrument(name = "tag.del", skip_all, fields(user_id = %user_id, tag_id = %tag_id))]
pub async fn del(user_id: PubkyId, tag_id: String) -> Result<(), EventProcessorError> {
debug!("Deleting tag: {} -> {}", user_id, tag_id);
let tag_details = TagUser::del_from_graph(&user_id, &tag_id).await?;
// CHOOSE THE EVENT TYPE

// 1. Read target from graph WITHOUT deleting the edge
let tag_details = TagUser::get_target_from_graph(&user_id, &tag_id).await?;

if let Some((tagged_user_id, post_id, author_id, label)) = tag_details {
match (tagged_user_id, post_id, author_id) {
// Delete user related indexes
(Some(tagged_id), None, None) => {
del_sync_user(user_id, &tagged_id, &label).await?;
// Guard: check if tagger is still in Redis set (not yet cleaned on a prior attempt)
let tagger_in_index =
TagUser::check_set_member(&[tagged_id.as_str(), label.as_str()], &user_id)
.await?
.1;
Comment on lines +274 to +277
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let tagger_in_index =
TagUser::check_set_member(&[tagged_id.as_str(), label.as_str()], &user_id)
.await?
.1;
let (_, tagger_in_index) =
TagUser::check_set_member(&[&tagged_id, &label], &user_id).await?;

del_sync_user(user_id.clone(), &tagged_id, &label, tagger_in_index).await?;
}
// Delete post related indexes
(None, Some(post_id), Some(author_id)) => {
del_sync_post(user_id, &post_id, &author_id, &label).await?;
let tagger_in_index = TagPost::check_set_member(
&[author_id.as_str(), post_id.as_str(), label.as_str()],
&user_id,
)
.await?
.1;
Comment on lines +282 to +287
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let tagger_in_index = TagPost::check_set_member(
&[author_id.as_str(), post_id.as_str(), label.as_str()],
&user_id,
)
.await?
.1;
let (_, tagger_in_index) =
TagPost::check_set_member(&[&author_id, &post_id, &label], &user_id).await?;

del_sync_post(
user_id.clone(),
&post_id,
&author_id,
&label,
tagger_in_index,
)
.await?;
}
// Handle other unexpected cases
_ => {
debug!("DEL-Tag: Unexpected combination of tag details");
}
}
} else {
return Err(EventProcessorError::SkipIndexing);

// 3. Graph deletion LAST — ensures data survives for retry if Redis ops fail
TagUser::del_from_graph(&user_id, &tag_id).await?;
}
// Edge already gone (fully completed on a prior attempt) — idempotent no-op
Ok(())
}

async fn del_sync_user(
tagger_id: PubkyId,
tagged_id: &str,
tag_label: &str,
tagger_in_index: bool,
) -> Result<(), EventProcessorError> {
let indexing_results = nexus_common::traced_join!(
tracing::info_span!("index.delete", phase = "tag_user");
// Update user counts in the tagged
UserCounts::decrement(tagged_id, "tags", None),
// Update user counts in the tagger
UserCounts::decrement(&tagger_id, "tagged", None),
// Guarded: Update user counts in the tagged
async {
// Decrement label count to the user profile tag
TagUser::update_index_score(tagged_id, None, tag_label, ScoreAction::Decrement(1.0)).await?;
// Decrease unique_tags
// NOTE: To update that field, we first need to decrement the value in the TagUser SORTED SET associated with that tag
UserCounts::decrement(tagged_id, "unique_tags", Some(tag_label)).await?;
if tagger_in_index {
UserCounts::decrement(tagged_id, "tags", None).await?;
}
Ok::<(), EventProcessorError>(())
},
// Guarded: Update user counts in the tagger
async {
// Remove tagger to the user taggers list
if tagger_in_index {
UserCounts::decrement(&tagger_id, "tagged", None).await?;
}
Ok::<(), EventProcessorError>(())
},
async {
if tagger_in_index {
// Decrement label count to the user profile tag
TagUser::update_index_score(tagged_id, None, tag_label, ScoreAction::Decrement(1.0)).await?;
// Decrease unique_tags
// NOTE: To update that field, we first need to decrement the value in the TagUser SORTED SET associated with that tag
UserCounts::decrement(tagged_id, "unique_tags", Some(tag_label)).await?;
}
Ok::<(), EventProcessorError>(())
},
async {
// Idempotent: Remove tagger from the user taggers list (SREM)
TagUser(vec![tagger_id.to_string()])
.del_from_index(tagged_id, None, tag_label)
.await?;
Ok::<(), EventProcessorError>(())
},
// Save new notification
Notification::new_user_untag(&tagger_id, tagged_id, tag_label)
// Guarded: notification
async {
if tagger_in_index {
Notification::new_user_untag(&tagger_id, tagged_id, tag_label).await?;
}
Ok::<(), EventProcessorError>(())
}
);

indexing_results.0?;
Expand All @@ -305,56 +369,76 @@ async fn del_sync_post(
post_id: &str,
author_id: &str,
tag_label: &str,
tagger_in_index: bool,
) -> Result<(), EventProcessorError> {
// SAVE TO INDEXES
let post_key_slice: &[&str] = &[author_id, post_id];
let tag_post = TagPost(vec![tagger_id.to_string()]);
let post_uri = post_uri_builder(author_id.to_string(), post_id.to_string());

let indexing_results = nexus_common::traced_join!(
tracing::info_span!("index.delete", phase = "tag_post");
// Update user counts for tagger
UserCounts::decrement(&tagger_id, "tagged", None),
// Decrement in one the post tags
PostCounts::decrement_index_field(post_key_slice, "tags", None),
// Guarded: Update user counts for tagger
async {
// Decrement label score in the post
TagPost::update_index_score(
author_id,
Some(post_id),
tag_label,
ScoreAction::Decrement(1.0),
)
.await?;
// Decrease unique_tag
// NOTE: To update that field, we first need to decrement the value in the SORTED SET associated with that tag
PostCounts::decrement_index_field(post_key_slice, "unique_tags", Some(tag_label))
if tagger_in_index {
UserCounts::decrement(&tagger_id, "tagged", None).await?;
}
Ok::<(), EventProcessorError>(())
},
// Guarded: Decrement in one the post tags
async {
if tagger_in_index {
PostCounts::decrement_index_field(post_key_slice, "tags", None).await?;
}
Ok::<(), EventProcessorError>(())
},
async {
if tagger_in_index {
// Decrement label score in the post
TagPost::update_index_score(
author_id,
Some(post_id),
tag_label,
ScoreAction::Decrement(1.0),
)
.await?;
// Decrease unique_tag
// NOTE: To update that field, we first need to decrement the value in the SORTED SET associated with that tag
PostCounts::decrement_index_field(post_key_slice, "unique_tags", Some(tag_label))
.await?;
}
Ok::<(), EventProcessorError>(())
},
// Guarded: Decrease post from label total engagement
async {
if tagger_in_index {
PostsByTagSearch::update_index_score(
author_id,
post_id,
tag_label,
ScoreAction::Decrement(1.0),
)
.await?;
}
Ok::<(), EventProcessorError>(())
},
// Decrease post from label total engagement
PostsByTagSearch::update_index_score(
author_id,
post_id,
tag_label,
ScoreAction::Decrement(1.0),
),
async {
// Post replies cannot be included in the total engagement index once the tag have been deleted
if !post_relationships_is_reply(author_id, post_id).await? {
// Decrement in one post global engagement
PostStream::update_index_score(author_id, post_id, ScoreAction::Decrement(1.0))
.await
.map_err(EventProcessorError::index_operation_failed)?;
if tagger_in_index {
// Post replies cannot be included in the total engagement index once the tag have been deleted
if !post_relationships_is_reply(author_id, post_id).await? {
// Decrement in one post global engagement
PostStream::update_index_score(author_id, post_id, ScoreAction::Decrement(1.0))
.await
.map_err(EventProcessorError::index_operation_failed)?;
}
}
Ok::<(), EventProcessorError>(())
},
async {
// Delete the tagger from the tag list
// Idempotent: Delete the tagger from the tag list (SREM)
tag_post
.del_from_index(author_id, Some(post_id), tag_label)
.await?;
// NOTE: The tag search index, depends on the post taggers collection to delete
// NOTE: The tag search index depends on the post taggers collection to delete
// Delete post from global label timeline
PostsByTagSearch::del_from_index(author_id, post_id, tag_label).await?;

Expand All @@ -368,8 +452,13 @@ async fn del_sync_post(

Ok::<(), EventProcessorError>(())
},
// Save new notification
Notification::new_post_untag(&tagger_id, author_id, tag_label, &post_uri)
// Guarded: notification
async {
if tagger_in_index {
Notification::new_post_untag(&tagger_id, author_id, tag_label, &post_uri).await?;
}
Ok::<(), EventProcessorError>(())
}
);

indexing_results.0?;
Expand Down
Loading
Loading