Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 2 additions & 48 deletions nexus-common/src/models/homeserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ use crate::db::kv::RedisError;
use crate::db::kv::RedisResult;
use crate::db::queries;
use crate::db::GraphResult;
use crate::db::{PubkyConnector, RedisOps};
use crate::db::RedisOps;
use crate::models::error::ModelError;
use crate::models::error::ModelResult;
use crate::models::user::UserDetails;

use pubky::PublicKey;
use pubky_app_specs::ParsedUri;
use pubky_app_specs::PubkyId;
use serde::{Deserialize, Serialize};
use tracing::info;
Expand Down Expand Up @@ -106,7 +102,7 @@ impl Homeserver {
Ok(())
}

/// Returns all HS IDs with at least one active user, sorted by user count descending.
/// Returns all HS IDs with at least one active user, sorted by user count descending.
///
/// # Returns
/// A list of active homeserver IDs.
Expand All @@ -116,48 +112,6 @@ impl Homeserver {
let hs_ids: Vec<String> = maybe_hs_ids.unwrap_or_default();
Ok(hs_ids)
}

/// If a referenced post is hosted on a new, unknown homeserver, this method triggers ingestion of that homeserver.
///
/// ### Arguments
///
/// - `referenced_post_uri`: The parent post (if current post is a reply to it), or a reposted post (if current post is a Repost)
pub async fn maybe_ingest_for_post(referenced_post_uri: &ParsedUri) -> ModelResult<()> {
let ref_post_author_id = referenced_post_uri.user_id.as_str();

Self::maybe_ingest_for_user(ref_post_author_id).await
}

/// If a referenced user is using a new, unknown homeserver, this method triggers ingestion of that homeserver.
///
/// ### Arguments
///
/// - `referenced_user_id`: The URI of the referenced user
#[tracing::instrument(name = "homeserver.ingest", skip_all)]
pub async fn maybe_ingest_for_user(referenced_user_id: &str) -> ModelResult<()> {
let pubky = PubkyConnector::get().map_err(ModelError::from_generic)?;

if UserDetails::get_by_id(referenced_user_id).await?.is_some() {
tracing::debug!(
"Skipping homeserver ingestion: author {referenced_user_id} already known"
);
return Ok(());
}

let ref_post_author_pk = referenced_user_id
.parse::<PublicKey>()
.map_err(ModelError::from_generic)?;
let Some(ref_post_author_hs) = pubky.get_homeserver_of(&ref_post_author_pk).await else {
tracing::warn!("Skipping homeserver ingestion: author {ref_post_author_pk} has no published homeserver");
return Ok(());
};

let hs_pk = PubkyId::from(ref_post_author_hs);
Self::persist_if_unknown(hs_pk.clone())
.await
.inspect(|_| tracing::info!("Ingested homeserver {hs_pk}"))
.inspect_err(|e| tracing::error!("Failed to ingest homeserver {hs_pk}: {e}"))
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion nexus-common/src/models/post/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use crate::db::{
OperationOutcome, RedisOps,
};
use crate::models::error::ModelResult;
use crate::models::user::UserDetails;
use chrono::Utc;
use pubky_app_specs::{post_uri_builder, PubkyAppPost, PubkyAppPostKind, PubkyId};
use pubky_app_specs::{post_uri_builder, ParsedUri, PubkyAppPost, PubkyAppPostKind, PubkyId};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

Expand Down Expand Up @@ -164,4 +165,15 @@ impl PostDetails {
}
Ok(())
}

/// If a referenced post is authored by a new, unknown user, this method triggers ingestion of that user.
///
/// ### Arguments
///
/// - `referenced_post_uri`: The parent post (if current post is a reply to it), or a reposted post (if current post is a Repost)
pub async fn maybe_ingest_author_of_post(referenced_post_uri: &ParsedUri) -> ModelResult<()> {
let ref_post_author_id = referenced_post_uri.user_id.as_str();

UserDetails::maybe_ingest_user(ref_post_author_id).await
}
}
61 changes: 59 additions & 2 deletions nexus-common/src/models/user/details.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use super::UserSearch;
use crate::db::graph::Query;
use crate::db::kv::RedisResult;
use crate::db::{exec_single_row, queries, GraphResult, RedisOps};
use crate::models::error::ModelResult;
use crate::db::{exec_single_row, queries, GraphResult, PubkyConnector, RedisOps};
use crate::models::error::{ModelError, ModelResult};
use crate::models::traits::Collection;
use async_trait::async_trait;
use chrono::Utc;
use pubky::PublicKey;
use pubky_app_specs::{PubkyAppUser, PubkyAppUserLink, PubkyId};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json;
use utoipa::ToSchema;

pub const USER_HS_CURSOR: [&str; 2] = ["Users", "Homeservers"];

#[async_trait]
impl RedisOps for UserDetails {}

Expand Down Expand Up @@ -90,6 +93,17 @@ impl UserDetails {
Ok(details_collection.into_iter().flatten().next())
}

/// Creates a minimal `UserDetails` with only the public key.
/// All profile fields (bio, links, status, image) default to `None`.
pub fn from_pubky(user_id: PubkyId) -> Self {
UserDetails {
name: user_id.to_string(),
id: user_id.clone(),
indexed_at: Utc::now().timestamp_millis(),
..Default::default()
}
}

pub fn from_homeserver(homeserver_user: PubkyAppUser, user_id: &PubkyId) -> Self {
UserDetails {
name: homeserver_user.name,
Expand All @@ -110,6 +124,49 @@ impl UserDetails {

Ok(())
}

/// If a referenced user is unknown, not ingested in the graph yet, resolves their homeserver
/// and persists the user node in the graph.
#[tracing::instrument(name = "user.ingest", skip_all)]
pub async fn maybe_ingest_user(user_id: &str) -> ModelResult<()> {
if Self::get_by_id(user_id).await?.is_some() {
tracing::debug!("Skipping user ingestion: {user_id} already known");
return Ok(());
}

let pubky = PubkyConnector::get().map_err(ModelError::from_generic)?;

let user_pk = user_id
.parse::<PublicKey>()
.map_err(ModelError::from_generic)?;

let Some(hs_pk) = pubky.get_homeserver_of(&user_pk).await else {
tracing::warn!(
"Skipping user ingestion: {user_id} has no published homeserver or it's a homeserver pubky"
);
return Ok(());
};

let pubky_id =
PubkyId::try_from(&user_pk.into_inner().to_z32()).map_err(ModelError::from_generic)?;
let user_details = Self::from_pubky(pubky_id);

let hs_id = &hs_pk.into_inner().to_z32();

// Do not add to index, as this would affect the timeline of events for this user.
// Only create stub graph node for HS-resolver to store user-HS mapping.
user_details
.put_to_graph()
Comment thread
tipogi marked this conversation as resolved.
.await
.inspect(|_| tracing::info!("Ingested user {user_id} from homeserver {hs_id}"))
.inspect_err(|e| tracing::error!("Failed to ingest user {user_id}: {e}"))?;

// Store the start point of the homeserver cursor
let key = &[&USER_HS_CURSOR[..], &[user_id]].concat();
Self::put_index_sorted_set(key, &[(0.0, hs_id)], None, None).await?;

Ok(())
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion nexus-common/src/models/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod tags;
mod view;

pub use counts::UserCounts;
pub use details::UserDetails;
pub use details::{UserDetails, USER_HS_CURSOR};
pub use influencers::Influencers;
pub use relationship::Relationship;
pub use search::{UserSearch, USER_NAME_KEY_PARTS};
Expand Down
6 changes: 3 additions & 3 deletions nexus-watcher/src/events/handlers/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::events::EventProcessorError;
use nexus_common::db::kv::JsonAction;
use nexus_common::db::OperationOutcome;
use nexus_common::models::follow::{Followers, Following, Friends, UserFollows};
use nexus_common::models::homeserver::Homeserver;
use nexus_common::models::notification::Notification;
use nexus_common::models::user::UserCounts;
use nexus_common::models::user::UserDetails;
use pubky_app_specs::PubkyId;
use tracing::debug;

Expand All @@ -22,8 +22,8 @@ pub async fn sync_put(
// Do not duplicate the follow relationship
OperationOutcome::Updated => return Ok(()),
OperationOutcome::MissingDependency => {
if let Err(e) = Homeserver::maybe_ingest_for_user(followee_id.as_str()).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = UserDetails::maybe_ingest_user(followee_id.as_str()).await {
tracing::error!("Failed to ingest user: {e}");
}

let key = RetryEvent::generate_index_key_from_uri(&followee_id.to_uri());
Expand Down
15 changes: 7 additions & 8 deletions nexus-watcher/src/events/handlers/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use crate::events::EventProcessorError;
use nexus_common::db::queries::get::post_is_safe_to_delete;
use nexus_common::db::{exec_single_row, execute_graph_operation, OperationOutcome};
use nexus_common::db::{queries, RedisOps};
use nexus_common::models::homeserver::Homeserver;
use nexus_common::models::notification::{Notification, PostChangedSource, PostChangedType};
use nexus_common::models::post::{
PostCounts, PostDetails, PostRelationships, PostStream, POST_TOTAL_ENGAGEMENT_KEY_PARTS,
};
use nexus_common::models::user::UserCounts;
use nexus_common::models::user::{UserCounts, UserDetails};
use pubky_app_specs::{
post_uri_builder, ParsedUri, PubkyAppPost, PubkyAppPostKind, PubkyId, Resource,
};
Expand Down Expand Up @@ -40,16 +39,16 @@ pub async fn sync_put(
let reply_dependency = RetryEvent::generate_index_key_from_uri(replied_to_uri);
dependency_event_keys.push(reply_dependency);

if let Err(e) = Homeserver::maybe_ingest_for_post(replied_to_uri).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = PostDetails::maybe_ingest_author_of_post(replied_to_uri).await {
tracing::error!("Failed to ingest user: {e}");
}
}
if let Some(reposted_uri) = &post_relationships.reposted {
let reply_dependency = RetryEvent::generate_index_key_from_uri(reposted_uri);
dependency_event_keys.push(reply_dependency);

if let Err(e) = Homeserver::maybe_ingest_for_post(reposted_uri).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = PostDetails::maybe_ingest_author_of_post(reposted_uri).await {
tracing::error!("Failed to ingest user: {e}");
}
}
if dependency_event_keys.is_empty() {
Expand Down Expand Up @@ -87,8 +86,8 @@ pub async fn sync_put(
// We only consider the first mentioned (tagged) user, to mitigate DoS attacks against Nexus
// whereby posts with many (inexistent) tagged PKs can cause Nexus to spend a lot of time trying to resolve them
if let Some(mentioned_user_id) = &post_relationships.mentioned.first() {
if let Err(e) = Homeserver::maybe_ingest_for_user(mentioned_user_id).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = UserDetails::maybe_ingest_user(mentioned_user_id).await {
tracing::error!("Failed to ingest user: {e}");
}
}

Expand Down
11 changes: 6 additions & 5 deletions nexus-watcher/src/events/handlers/tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use crate::events::EventProcessorError;
use chrono::Utc;
use nexus_common::db::kv::ScoreAction;
use nexus_common::db::OperationOutcome;
use nexus_common::models::homeserver::Homeserver;
use nexus_common::models::notification::Notification;
use nexus_common::models::post::search::PostsByTagSearch;
use nexus_common::models::post::PostDetails;
use nexus_common::models::post::{PostCounts, PostStream};
use nexus_common::models::tag::post::TagPost;
use nexus_common::models::tag::search::TagSearch;
use nexus_common::models::tag::traits::{TagCollection, TaggersCollection};
use nexus_common::models::tag::user::TagUser;
use nexus_common::models::user::UserCounts;
use nexus_common::models::user::UserDetails;
use nexus_common::types::Pagination;
use pubky_app_specs::{post_uri_builder, ParsedUri, PubkyAppTag, PubkyId, Resource};
use tracing::debug;
Expand Down Expand Up @@ -83,8 +84,8 @@ async fn put_sync_post(
// Ensure that dependencies follow the same format as the RetryManager keys
let dependency = vec![format!("{author_id}:posts:{post_id}")];
if let Ok(referenced_post_uri) = ParsedUri::try_from(post_uri) {
if let Err(e) = Homeserver::maybe_ingest_for_post(&referenced_post_uri).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = PostDetails::maybe_ingest_author_of_post(&referenced_post_uri).await {
tracing::error!("Failed to ingest user: {e}");
}
}
Err(EventProcessorError::MissingDependency { dependency })
Expand Down Expand Up @@ -188,8 +189,8 @@ async fn put_sync_user(
{
OperationOutcome::Updated => Ok(()),
OperationOutcome::MissingDependency => {
if let Err(e) = Homeserver::maybe_ingest_for_user(tagged_user_id.as_str()).await {
tracing::error!("Failed to ingest homeserver: {e}");
if let Err(e) = UserDetails::maybe_ingest_user(tagged_user_id.as_str()).await {
tracing::error!("Failed to ingest user: {e}");
}

let key = RetryEvent::generate_index_key_from_uri(&tagged_user_id.to_uri());
Expand Down
8 changes: 0 additions & 8 deletions nexus-watcher/tests/event_processor/homeserver/utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion nexus-watcher/tests/event_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod bookmarks;
mod files;
mod follows;
mod homeserver;
mod mentions;
mod network;
mod posts;
Expand Down
1 change: 1 addition & 0 deletions nexus-watcher/tests/homeserver/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod active_homeservers;
2 changes: 2 additions & 0 deletions nexus-watcher/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod event_processor;
mod homeserver;
mod service;
mod user_ingestion;
Original file line number Diff line number Diff line change
@@ -1,46 +1,34 @@
use crate::event_processor::{
homeserver::utils::create_external_test_homeserver, utils::watcher::WatcherTest,
};
use super::utils::{assert_user_ingested, create_external_test_homeserver};
use crate::event_processor::utils::watcher::WatcherTest;
use anyhow::Result;
use nexus_common::models::homeserver::Homeserver;
use pubky::Keypair;
use pubky_app_specs::{PubkyAppUser, PubkyId};
use pubky_app_specs::PubkyAppUser;

#[tokio_shared_rt::test(shared)]
async fn test_follow_on_unknown_homeserver() -> Result<()> {
let mut test = WatcherTest::setup().await?;

// Create a separate homeserver for the followee
let followee_hs_pk = create_external_test_homeserver(&mut test).await?;
let followee_hs_id = PubkyId::try_from(&followee_hs_pk.to_z32()).unwrap();

// Create followee
let followee_kp = Keypair::random();
let followee_id = followee_kp.public_key().to_z32();

// Register the followee PK in the new homeserver
// We only need the record mapping, not necessarily the profile.json being uploaded
test.register_user_in_hs(&followee_kp, &followee_hs_pk)
.await?;

// Create follower user
let follower_kp = Keypair::random();
let follower_user = PubkyAppUser {
bio: Some("test_follow_on_unknown_homeserver".to_string()),
image: None,
links: None,
name: "Watcher:Homeserver:Follow".to_string(),
name: "Watcher:UserIngestion:Follow".to_string(),
status: None,
};
let _follower_id = test.create_user(&follower_kp, &follower_user).await?;

// Follow the followee
test.create_follow(&follower_kp, &followee_id).await?;

assert!(Homeserver::get_by_id(followee_hs_id)
.await
.unwrap()
.is_some());
assert_user_ingested(&followee_id, &followee_hs_pk).await;

Ok(())
}
Loading