Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
91098c1
feat: initial version of user-hs-resolver task
ok300 Mar 11, 2026
0eb9f17
Remove orphan HS cleanup
Copilot Mar 19, 2026
bb640d7
Introduce `UserHsFailures::get_all`, remove `get`, use in
Copilot Mar 20, 2026
960cb90
Simplify logging
ok300 Mar 20, 2026
6705bc9
Rename user_failures variable
ok300 Mar 20, 2026
1d9a048
Rename `get_all_from_graph` to `get_all_active_from_graph`, exclude o…
Copilot Mar 21, 2026
9e35b56
Add loop comment
ok300 Mar 21, 2026
de11d89
Merge branch 'feat/dx-events-by-user' into ok300-user-hs-resolver
ok300 Mar 21, 2026
be1c20f
Simplify get_all_homeservers query to return rows instead of collecte…
Copilot Mar 21, 2026
a5a76aa
chore: rename get_all_homeservers query
ok300 Mar 22, 2026
777d59c
feat: reduce homeserver resolution frequency with TTL-based filtering…
ok300 Mar 22, 2026
909e95b
Extract `hs_resolver_sleep` into `WatcherConfig` (#97)
Copilot Mar 22, 2026
76ec5c4
Exclude orphan homeservers from get_all_from_graph, remove Homeserver…
Copilot Mar 22, 2026
6d4d640
Remove unused method
ok300 Mar 22, 2026
551a028
Merge remote-tracking branch 'origin/feat/dx-events-by-user' into ok3…
ok300 Mar 23, 2026
ea0d583
Clarify set_user_homeserver query
ok300 Mar 23, 2026
9f956fb
Avoid extra clone() in sort_by_failures
ok300 Mar 23, 2026
46a164e
Mark UserHsFailures::get as only used in tests
ok300 Mar 23, 2026
cf493d8
Remove UserHsFailures and associated Redis Sorted Set (#100)
Copilot Mar 23, 2026
118a704
Add Redis-backed circuit breaker for homeserver health tracking (#102)
ok300 Mar 23, 2026
e7d51e5
Simplify user_hs_resolver.rs
ok300 Mar 23, 2026
9f28dcf
Add default fn for WatcherConfig::hs_resolver_sleep
ok300 Mar 23, 2026
aab729d
Clarify rustdoc of set_user_homeserver query
ok300 Mar 23, 2026
f1c3941
Simplify, clarify rustdocs
ok300 Mar 23, 2026
2fa8e2e
Merge two almost identical active_homeservers.rs tests
ok300 Mar 24, 2026
84c6f4e
Simplify active / orphan tests in active_homeservers.rs
ok300 Mar 24, 2026
43a35ec
Remove circuit breaker logic and tests (#105)
ok300 Mar 24, 2026
cb6a8d1
Merge branch 'feat/dx-events-by-user' into ok300-user-hs-resolver
ok300 Mar 30, 2026
4ef6aec
fix: exclude deleted users from homeserver Neo4j queries
Copilot Mar 30, 2026
6cbcf63
Merge remote-tracking branch 'origin/feat/dx-events-by-user' into
ok300 Apr 3, 2026
c01365d
chore: remove TODO
ok300 Apr 3, 2026
45db64a
doc: clarify ttl
ok300 Apr 3, 2026
8ae8f93
Add TODO for HS naming
ok300 Apr 3, 2026
0fdf196
Remove TODO for HS naming
ok300 Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/watcher/watcher-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ testnet = false
homeserver = "8um71us3fyw6h8wbcxb5ar3rwusy1a6u49956ikzojg3gcwd1dty"
events_limit = 50
watcher_sleep = 5000
hs_resolver_sleep = 10000
# Initial backoff duration (in seconds) after the first failure of a homeserver
initial_backoff_secs = 60
# Maximum backoff duration (in seconds) for a failing homeserver
Expand Down
3 changes: 3 additions & 0 deletions nexus-common/default.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ events_limit = 50
# Maximum number of monitored homeservers. If set to 1, only the default homeserver is monitored.
monitored_homeservers_limit = 50
watcher_sleep = 5000
hs_resolver_sleep = 10000
# Minimum time (ms) before a user's homeserver mapping is considered stale and therefore eligible to be re-resolved (default: 1 hour)
hs_resolver_ttl = 3600000
Comment thread
ok300 marked this conversation as resolved.
# Initial backoff duration (in seconds) after the first failure of a homeserver
initial_backoff_secs = 60
# Maximum backoff duration (in seconds) for a failing homeserver
Expand Down
1 change: 1 addition & 0 deletions nexus-common/src/config/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ mod tests {
);
assert_eq!(c.watcher.events_limit, 50);
assert_eq!(c.watcher.watcher_sleep, 5_000);
assert_eq!(c.watcher.hs_resolver_sleep, 10_000);
assert_eq!(
c.watcher.moderation_id,
PubkyId::try_from("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap()
Expand Down
4 changes: 3 additions & 1 deletion nexus-common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ pub use api::ApiConfig;
pub use daemon::DaemonConfig;
pub use stack::{default_stack, OtlpConfig, StackConfig};
pub use watcher::WatcherConfig;
pub use watcher::{DEFAULT_INITIAL_BACKOFF_SECS, DEFAULT_MAX_BACKOFF_SECS};
pub use watcher::{
DEFAULT_HS_RESOLVER_TTL, DEFAULT_INITIAL_BACKOFF_SECS, DEFAULT_MAX_BACKOFF_SECS,
};

use crate::file::validate_and_expand_path;

Expand Down
29 changes: 29 additions & 0 deletions nexus-common/src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub const DEFAULT_EVENTS_LIMIT: u32 = 1_000;
pub const DEFAULT_MONITORED_HOMESERVERS_LIMIT: usize = 50;
/// Default for [WatcherConfig::watcher_sleep]
pub const DEFAULT_WATCHER_SLEEP: u64 = 5_000;
/// Default for [WatcherConfig::hs_resolver_sleep]
pub const DEFAULT_HS_RESOLVER_SLEEP: u64 = 10_000;
/// Default for [WatcherConfig::hs_resolver_ttl]: 1 hour in milliseconds
pub const DEFAULT_HS_RESOLVER_TTL: u64 = 3_600_000;
/// Default for [WatcherConfig::initial_backoff_secs]
pub const DEFAULT_INITIAL_BACKOFF_SECS: u64 = 60;
/// Default for [WatcherConfig::max_backoff_secs]
Expand All @@ -36,14 +40,28 @@ pub const MODERATED_TAGS: [&str; 6] = [
pub struct WatcherConfig {
pub testnet: bool,
pub testnet_host: String,

/// Default homeserver. Other homeservers may be ingested in addition, but this one is prioritized.
pub homeserver: PubkyId,

/// Maximum number of events to fetch per run from each homeserver
pub events_limit: u32,

/// Maximum number of monitored homeservers
pub monitored_homeservers_limit: usize,

/// Sleep between every full run (over all monitored homeservers), in milliseconds
pub watcher_sleep: u64,

/// Sleep between every run of the user HS resolver periodic task, in milliseconds
#[serde(default = "default_hs_resolver_sleep")]
pub hs_resolver_sleep: u64,

/// Minimum time (ms) before a user's homeserver mapping is re-resolved.
/// Users whose `HOSTED_BY.resolved_at` is newer than this TTL are skipped.
#[serde(default = "default_hs_resolver_ttl")]
pub hs_resolver_ttl: u64,

/// Initial backoff duration (in seconds) after the first failure of a homeserver
#[serde(default = "default_initial_backoff_secs")]
pub initial_backoff_secs: u64,
Expand All @@ -52,6 +70,7 @@ pub struct WatcherConfig {
pub max_backoff_secs: u64,
#[serde(default = "default_stack")]
pub stack: StackConfig,

// Moderation
pub moderation_id: PubkyId,
pub moderated_tags: Vec<String>,
Expand All @@ -74,6 +93,8 @@ impl Default for WatcherConfig {
events_limit: DEFAULT_EVENTS_LIMIT,
monitored_homeservers_limit: DEFAULT_MONITORED_HOMESERVERS_LIMIT,
watcher_sleep: DEFAULT_WATCHER_SLEEP,
hs_resolver_sleep: DEFAULT_HS_RESOLVER_SLEEP,
hs_resolver_ttl: DEFAULT_HS_RESOLVER_TTL,
initial_backoff_secs: DEFAULT_INITIAL_BACKOFF_SECS,
max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS,
moderation_id,
Expand All @@ -82,6 +103,14 @@ impl Default for WatcherConfig {
}
}

fn default_hs_resolver_sleep() -> u64 {
DEFAULT_HS_RESOLVER_SLEEP
}

fn default_hs_resolver_ttl() -> u64 {
DEFAULT_HS_RESOLVER_TTL
}

/// Converts a [`DaemonConfig`] into an [`WatcherConfig`], extracting only the Watcher-related settings
/// and the shared application stack
impl From<DaemonConfig> for WatcherConfig {
Expand Down
46 changes: 40 additions & 6 deletions nexus-common/src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,48 @@ pub fn get_homeserver_by_id(id: &str) -> Query {
.param("id", id)
}

/// Retrieves all homeserver IDs
pub fn get_all_homeservers() -> Query {
/// Retrieves all homeserver IDs that have at least one active user
/// (incoming `HOSTED_BY` relationships from `User` nodes).
///
/// The results are sorted by the number of active users in descending order.
/// Returns a single `homeservers_list` column containing the collected IDs.
pub fn get_all_homeservers_with_active_users() -> Query {
Query::new(
"get_all_homeservers_with_active_users",
"MATCH (u:User)-[:HOSTED_BY]->(hs:Homeserver)
WHERE u.name <> '[DELETED]'
WITH hs.id AS id, count(u) AS active_users
ORDER BY active_users DESC
RETURN collect(id) AS homeservers_list",
Comment thread
ok300 marked this conversation as resolved.
)
}

/// Retrieves user IDs whose homeserver mapping is stale
/// (`resolved_at` is older than `ttl_ms`) or missing (no `HOSTED_BY` edge).
pub fn get_users_needing_hs_resolution(ttl_ms: u64) -> Query {
Query::new(
"get_users_needing_hs_resolution",
Comment thread
ok300 marked this conversation as resolved.
"MATCH (u:User)
WHERE u.name <> '[DELETED]'
OPTIONAL MATCH (u)-[r:HOSTED_BY]->(:Homeserver)
WITH u, r
WHERE r IS NULL
OR r.resolved_at IS NULL
OR r.resolved_at < (timestamp() - $ttl_ms)
RETURN collect(u.id) AS user_ids",
)
.param("ttl_ms", ttl_ms as i64)
}

/// Retrieves all user IDs hosted on a given homeserver.
pub fn get_users_by_homeserver(hs_id: &str) -> Query {
Query::new(
"get_all_homeservers",
"MATCH (hs:Homeserver)
WITH collect(hs.id) AS homeservers_list
RETURN homeservers_list",
"get_users_by_homeserver",
"MATCH (u:User)-[:HOSTED_BY]->(:Homeserver {id: $hs_id})
WHERE u.name <> '[DELETED]'
RETURN collect(u.id) AS user_ids",
Comment thread
ok300 marked this conversation as resolved.
)
.param("hs_id", hs_id.to_string())
}

/// Retrieve tags for a user within the viewer's trusted network
Expand Down
26 changes: 26 additions & 0 deletions nexus-common/src/db/graph/queries/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,29 @@ pub fn create_homeserver(homeserver_id: &str) -> Query {
)
.param("id", homeserver_id)
}

/// Sets the `HOSTED_BY` relationship between a user and a homeserver.
///
/// If the user is already on the target homeserver, refreshes `resolved_at`.
/// If the user is on a different homeserver, replaces the old relationship.
/// MERGEs the target homeserver node if it doesn't exist yet.
pub fn set_user_homeserver(user_id: &str, homeserver_id: &str) -> Query {
Query::new(
"set_user_homeserver",
"MATCH (u:User {id: $user_id})

// Remove existing HOSTED_BY only if homeserver changed
OPTIONAL MATCH (u)-[old:HOSTED_BY]->(old_hs:Homeserver)
WHERE old_hs.id <> $hs_id
DELETE old

WITH u

// Ensure target homeserver and relationship exist
MERGE (hs:Homeserver {id: $hs_id})
MERGE (u)-[r:HOSTED_BY]->(hs)
SET r.resolved_at = timestamp()",
)
.param("user_id", user_id.to_string())
.param("hs_id", homeserver_id.to_string())
}
12 changes: 6 additions & 6 deletions nexus-common/src/models/homeserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@ impl Homeserver {
Ok(())
}

/// Retrieves all homeservers from the graph.
/// Returns all HS IDs with at least one active user, sorted by user count descending.
///
/// # Returns
/// A list of all known homeserver IDs.
/// A list of active homeserver IDs.
///
/// # Errors
/// Throws an error if no homeservers are found.
pub async fn get_all_from_graph() -> GraphResult<Vec<String>> {
let query = queries::get::get_all_homeservers();
/// Returns an error if no active homeservers are found.
pub async fn get_all_active_from_graph() -> GraphResult<Vec<String>> {
Comment thread
ok300 marked this conversation as resolved.
let query = queries::get::get_all_homeservers_with_active_users();
let maybe_hs_ids = fetch_key_from_graph(query, "homeservers_list").await?;
let hs_ids: Vec<String> = maybe_hs_ids.unwrap_or_default();

match hs_ids.is_empty() {
true => Err(GraphError::Generic("No homeservers found in graph".into())),
true => Err(GraphError::Generic("No active HSs found in graph".into())),
false => Ok(hs_ids),
}
}
Expand Down
9 changes: 8 additions & 1 deletion nexus-watcher/src/service/indexer/key_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use nexus_common::models::homeserver::Homeserver;

use super::TEventProcessor;
use crate::events::Moderation;
use crate::service::user_hs_resolver;

/// Event processor for non-default HSs, where the user-specific `/events-stream` endpoint is used
pub struct KeyBasedEventProcessor {
Expand All @@ -27,8 +28,14 @@ impl TEventProcessor for KeyBasedEventProcessor {
&self.moderation
}

// TODO Implement
async fn run_internal(self: Arc<Self>) -> Result<(), EventProcessorError> {
let current_hs_id = self.homeserver.id.to_string();

// Find monitored user IDs from this HS
let _user_ids = user_hs_resolver::get_user_ids_by_homeserver(&current_hs_id);

// TODO Implement: fetch events per user

Ok(())
}
}
17 changes: 13 additions & 4 deletions nexus-watcher/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod indexer;
pub mod runner;
pub mod stats;
mod task_runner;
pub mod user_hs_resolver;

/// Module exports
pub use constants::{PROCESSING_TIMEOUT_SECS, WATCHER_CONFIG_FILE_NAME};
Expand Down Expand Up @@ -71,20 +72,25 @@ impl NexusWatcher {

/// Starts the Nexus Watcher with parallel periodic task loops.
///
/// Currently runs two tasks:
/// Currently runs three tasks:
/// 1. **Default homeserver**: Processes events from the default homeserver defined in [`WatcherConfig`].
/// 2. **External homeservers**: Processes events from all external monitored homeservers, excluding the default.
/// 3. **User HS resolver**: Resolves each user's homeserver and persists `HOSTED_BY` relationships.
///
/// All tasks share the same tick interval ([`WatcherConfig::watcher_sleep`]) and listen for
/// the shutdown signal to exit gracefully. If any task panics, an internal cancellation
/// signal is sent so that sibling tasks can finish their current iteration and exit.
/// The event-processing tasks share the same tick interval ([`WatcherConfig::watcher_sleep`]),
/// while the HS resolver uses its own interval ([`WatcherConfig::hs_resolver_sleep`]).
/// All tasks listen for the shutdown signal to exit gracefully. If any task panics,
/// an internal cancellation signal is sent so that sibling tasks can finish their
/// current iteration and exit.
pub async fn start(shutdown_rx: Receiver<bool>, config: WatcherConfig) -> Result<(), DynError> {
debug!(?config, "Running NexusWatcher with ");

let config_hs = PubkyId::try_from(config.homeserver.as_str())?;
Homeserver::persist_if_unknown(config_hs).await?;

let watcher_sleep = config.watcher_sleep;
let hs_resolver_sleep = config.hs_resolver_sleep;
let hs_resolver_ttl = config.hs_resolver_ttl;

let hs_runner = Arc::new(HsEventProcessorRunner::from_config(
&config,
Expand All @@ -104,6 +110,9 @@ impl NexusWatcher {
let runner = key_based_runner.clone();
async move { runner.run().await.map(|_| ()) }
}),
PeriodicTask::new("user-hs-resolver", hs_resolver_sleep, move || async move {
user_hs_resolver::run(hs_resolver_ttl).await
}),
];

let task_results = run_periodic_tasks(tasks, shutdown_rx).await;
Expand Down
5 changes: 3 additions & 2 deletions nexus-watcher/src/service/runner/homeserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ impl TEventProcessorRunner for HsEventProcessorRunner {
self.shutdown_rx.clone()
}

async fn build(&self, hs_id: String) -> Result<Arc<dyn TEventProcessor>, DynError> {
let homeserver_id = PubkyId::try_from(&hs_id)?;
/// Creates and returns a new event processor instance for the specified homeserver
async fn build(&self, homeserver_id: String) -> Result<Arc<dyn TEventProcessor>, DynError> {
let homeserver_id = PubkyId::try_from(&homeserver_id)?;
let homeserver = Homeserver::get_by_id(homeserver_id)
.await?
.ok_or("Homeserver not found")?;
Expand Down
4 changes: 3 additions & 1 deletion nexus-watcher/src/service/runner/key_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ impl KeyBasedEventProcessorRunner {
/// Returns the homeserver IDs relevant for this run, ordered by their priority.
/// The default homeserver is excluded from this list.
async fn hs_by_priority(&self) -> Result<Vec<String>, DynError> {
let hs_ids = Homeserver::get_all_from_graph().await?;
let hs_ids = Homeserver::get_all_active_from_graph().await?;
let default_hs = self.default_homeserver.as_str();

// Exclude the default homeserver from the list, as it is processed separately
// The default HS is not expected to be active, but we still filter as an extra precaution
let hs_ids: Vec<String> = hs_ids
.into_iter()
.filter(|hs_id| hs_id != default_hs)
Expand Down
2 changes: 1 addition & 1 deletion nexus-watcher/src/service/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait TEventProcessorRunner: Send + Sync {

/// Pre-processing step before the main run loop.
///
/// Determines the list of target IDs to process in this run cycle.
/// Determines the list of target HS IDs to process in this run cycle.
async fn pre_run(&self) -> Result<Vec<String>, DynError>;

/// Post-processing of the run results.
Expand Down
Loading