Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6a06e9c
deps: bump pubky to relevant branch
ok300 Feb 15, 2026
c1c6c98
feat: split NexusWatcher into 3 parallel threads and exclude default …
Copilot Feb 15, 2026
efab342
Rename vars, methods to indicate they apply to external HSs
ok300 Feb 15, 2026
427e65d
deps: include latest updates from pubky
ok300 Feb 19, 2026
8dcdccf
Initial plan
Copilot Feb 19, 2026
3aa9a5d
fix: use to_z32() for PubkyId conversion to resolve pkarr version mis…
Copilot Feb 19, 2026
f00aba8
Merge pull request #71 from ok300/copilot/sub-pr-70
ok300 Feb 19, 2026
cb935d6
Fix PubkyId conversion from PublicKey
ok300 Feb 19, 2026
033aa4c
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Feb 19, 2026
fb9c064
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Feb 25, 2026
e681907
Bump pubky dependencies to latest 0.7.x
ok300 Feb 27, 2026
8c73b86
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Feb 27, 2026
51e113a
Fix merge conflicts
ok300 Feb 27, 2026
7b4a164
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Mar 2, 2026
0fc56ba
Bump pubky dependencies to latest 0.7.x
ok300 Mar 2, 2026
8e15bc5
Bump pubky dependencies to 0.7.0-rc2
ok300 Mar 2, 2026
f57d82c
refactor: extract periodic task loop into reusable `run_periodic_task…
ok300 Mar 6, 2026
6713068
test: Use unseeded testnets for nexus-watcher tests (#758)
ok300 Mar 9, 2026
a4d8756
`nexus-api` tests: use unseeded `Testnet` (#753)
ok300 Mar 9, 2026
878ce57
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Mar 10, 2026
cff0a52
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Mar 17, 2026
5ad570a
deps: bump pubky to latest release 0.7.0
ok300 Mar 18, 2026
0229c17
Fix flaky test, task runner scheduling
ok300 Mar 21, 2026
97a9a26
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Mar 23, 2026
49af0a0
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Mar 30, 2026
d1578a0
ref: runners and indexers (#783)
tipogi Apr 3, 2026
ccf5bb1
Merge remote-tracking branch 'origin/main' into feat/dx-events-by-user
ok300 Apr 3, 2026
265bc61
`TEventProcessor`: expand logic of `handle_error` (#788)
ok300 Apr 3, 2026
b318069
lint: cargo clippy
ok300 Apr 3, 2026
ec43bd7
feat: add `user-hs-resolver` task (#765)
ok300 Apr 3, 2026
fc72fb7
fix: empty external HS list should not result in error
ok300 Apr 3, 2026
e915f02
fix: ingest unknown users (#795)
tipogi Apr 6, 2026
551eda3
chore: add TEventProcessor::instance_name() (#798)
ok300 Apr 6, 2026
6bb307b
fix: remove stale HOSTED_BY edge if no HS is found for user (#791)
ok300 Apr 8, 2026
93ce106
On HS resolution, order PKs with bisection ordering (#797)
ok300 Apr 9, 2026
38108c5
chore: simplify Moderation init (#805)
ok300 Apr 9, 2026
a16b12f
chore: remove superfluous error helpers (#804)
ok300 Apr 9, 2026
49b6a4f
Merge branch 'origin-main' into feat/dx-events-by-user
ok300 Apr 11, 2026
04eb3d5
feat: record metrics for each run of the HS resolver task (#807)
ok300 Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 26 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/watcher/watcher-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ testnet = false
homeserver = "8um71us3fyw6h8wbcxb5ar3rwusy1a6u49956ikzojg3gcwd1dty"
events_limit = 50
watcher_sleep = 5000
hs_resolver_sleep = 10000
# Initial backoff duration (in seconds) after the first failure of a homeserver
initial_backoff_secs = 60
# Maximum backoff duration (in seconds) for a failing homeserver
Expand Down
3 changes: 3 additions & 0 deletions nexus-common/default.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ events_limit = 50
# Maximum number of monitored homeservers. If set to 1, only the default homeserver is monitored.
monitored_homeservers_limit = 50
watcher_sleep = 5000
hs_resolver_sleep = 10000
# Minimum time (ms) before a user's homeserver mapping is considered stale and therefore eligible to be re-resolved (default: 1 hour)
hs_resolver_ttl = 3600000
# Initial backoff duration (in seconds) after the first failure of a homeserver
initial_backoff_secs = 60
# Maximum backoff duration (in seconds) for a failing homeserver
Expand Down
1 change: 1 addition & 0 deletions nexus-common/src/config/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ mod tests {
);
assert_eq!(c.watcher.events_limit, 50);
assert_eq!(c.watcher.watcher_sleep, 5_000);
assert_eq!(c.watcher.hs_resolver_sleep, 10_000);
assert_eq!(
c.watcher.moderation_id,
PubkyId::try_from("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap()
Expand Down
4 changes: 3 additions & 1 deletion nexus-common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ pub use api::ApiConfig;
pub use daemon::DaemonConfig;
pub use stack::{default_stack, OtlpConfig, StackConfig};
pub use watcher::WatcherConfig;
pub use watcher::{DEFAULT_INITIAL_BACKOFF_SECS, DEFAULT_MAX_BACKOFF_SECS};
pub use watcher::{
DEFAULT_HS_RESOLVER_TTL, DEFAULT_INITIAL_BACKOFF_SECS, DEFAULT_MAX_BACKOFF_SECS,
};

use crate::file::validate_and_expand_path;

Expand Down
29 changes: 29 additions & 0 deletions nexus-common/src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub const DEFAULT_EVENTS_LIMIT: u32 = 1_000;
pub const DEFAULT_MONITORED_HOMESERVERS_LIMIT: usize = 50;
/// Default for [WatcherConfig::watcher_sleep]
pub const DEFAULT_WATCHER_SLEEP: u64 = 5_000;
/// Default for [WatcherConfig::hs_resolver_sleep]
pub const DEFAULT_HS_RESOLVER_SLEEP: u64 = 10_000;
/// Default for [WatcherConfig::hs_resolver_ttl]: 1 hour in milliseconds
pub const DEFAULT_HS_RESOLVER_TTL: u64 = 3_600_000;
/// Default for [WatcherConfig::initial_backoff_secs]
pub const DEFAULT_INITIAL_BACKOFF_SECS: u64 = 60;
/// Default for [WatcherConfig::max_backoff_secs]
Expand All @@ -36,14 +40,28 @@ pub const MODERATED_TAGS: [&str; 6] = [
pub struct WatcherConfig {
pub testnet: bool,
pub testnet_host: String,

/// Default homeserver. Other homeservers may be ingested in addition, but this one is prioritized.
pub homeserver: PubkyId,

/// Maximum number of events to fetch per run from each homeserver
pub events_limit: u32,

/// Maximum number of monitored homeservers
pub monitored_homeservers_limit: usize,

/// Sleep between every full run (over all monitored homeservers), in milliseconds
pub watcher_sleep: u64,

/// Sleep between every run of the user HS resolver periodic task, in milliseconds
#[serde(default = "default_hs_resolver_sleep")]
pub hs_resolver_sleep: u64,

/// Minimum time (ms) before a user's homeserver mapping is re-resolved.
/// Users whose `HOSTED_BY.resolved_at` is newer than this TTL are skipped.
#[serde(default = "default_hs_resolver_ttl")]
pub hs_resolver_ttl: u64,

/// Initial backoff duration (in seconds) after the first failure of a homeserver
#[serde(default = "default_initial_backoff_secs")]
pub initial_backoff_secs: u64,
Expand All @@ -52,6 +70,7 @@ pub struct WatcherConfig {
pub max_backoff_secs: u64,
#[serde(default = "default_stack")]
pub stack: StackConfig,

// Moderation
pub moderation_id: PubkyId,
pub moderated_tags: Vec<String>,
Expand All @@ -74,6 +93,8 @@ impl Default for WatcherConfig {
events_limit: DEFAULT_EVENTS_LIMIT,
monitored_homeservers_limit: DEFAULT_MONITORED_HOMESERVERS_LIMIT,
watcher_sleep: DEFAULT_WATCHER_SLEEP,
hs_resolver_sleep: DEFAULT_HS_RESOLVER_SLEEP,
hs_resolver_ttl: DEFAULT_HS_RESOLVER_TTL,
initial_backoff_secs: DEFAULT_INITIAL_BACKOFF_SECS,
max_backoff_secs: DEFAULT_MAX_BACKOFF_SECS,
moderation_id,
Expand All @@ -82,6 +103,14 @@ impl Default for WatcherConfig {
}
}

fn default_hs_resolver_sleep() -> u64 {
DEFAULT_HS_RESOLVER_SLEEP
}

fn default_hs_resolver_ttl() -> u64 {
DEFAULT_HS_RESOLVER_TTL
}

/// Converts a [`DaemonConfig`] into an [`WatcherConfig`], extracting only the Watcher-related settings
/// and the shared application stack
impl From<DaemonConfig> for WatcherConfig {
Expand Down
11 changes: 11 additions & 0 deletions nexus-common/src/db/graph/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub enum GraphError {
Generic(String),
}

impl GraphError {
#[allow(clippy::match_like_matches_macro)]
pub fn is_infrastructure_err(&self) -> bool {
match self {
GraphError::ConnectionNotInitialized => true,
GraphError::QueryFailed(_) => true,
_ => false,
}
}
}

impl From<neo4rs::DeError> for GraphError {
fn from(e: neo4rs::DeError) -> Self {
GraphError::DeserializationFailed(Box::new(e))
Expand Down
10 changes: 10 additions & 0 deletions nexus-common/src/db/graph/queries/del.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ pub fn delete_tag(user_id: &str, tag_id: &str, app: Option<&str>) -> Query {
query
}

/// Removes the `HOSTED_BY` relationship from a user, if one exists.
pub fn remove_user_homeserver(user_id: &str) -> Query {
Query::new(
"remove_user_homeserver",
"MATCH (u:User {id: $user_id})-[r:HOSTED_BY]->(:Homeserver)
DELETE r;",
)
.param("user_id", user_id.to_string())
}

/// Deletes a file node and all its relationships
/// # Arguments
/// * `owner_id` - The unique identifier of the user who owns the file
Expand Down
46 changes: 40 additions & 6 deletions nexus-common/src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,48 @@ pub fn get_homeserver_by_id(id: &str) -> Query {
.param("id", id)
}

/// Retrieves all homeserver IDs
pub fn get_all_homeservers() -> Query {
/// Retrieves all homeserver IDs that have at least one active user
/// (incoming `HOSTED_BY` relationships from `User` nodes).
///
/// The results are sorted by the number of active users in descending order.
/// Returns a single `homeservers_list` column containing the collected IDs.
pub fn get_all_homeservers_with_active_users() -> Query {
Query::new(
"get_all_homeservers_with_active_users",
"MATCH (u:User)-[:HOSTED_BY]->(hs:Homeserver)
WHERE u.name <> '[DELETED]'
WITH hs.id AS id, count(u) AS active_users
ORDER BY active_users DESC
RETURN collect(id) AS homeservers_list",
)
}

/// Retrieves user IDs whose homeserver mapping is stale
/// (`resolved_at` is older than `ttl_ms`) or missing (no `HOSTED_BY` edge).
pub fn get_users_needing_hs_resolution(ttl_ms: u64) -> Query {
Query::new(
"get_users_needing_hs_resolution",
"MATCH (u:User)
WHERE u.name <> '[DELETED]'
OPTIONAL MATCH (u)-[r:HOSTED_BY]->(:Homeserver)
WITH u, r
WHERE r IS NULL
OR r.resolved_at IS NULL
OR r.resolved_at < (timestamp() - $ttl_ms)
RETURN collect(u.id) AS user_ids",
)
.param("ttl_ms", ttl_ms as i64)
}

/// Retrieves all user IDs hosted on a given homeserver.
pub fn get_users_by_homeserver(hs_id: &str) -> Query {
Query::new(
"get_all_homeservers",
"MATCH (hs:Homeserver)
WITH collect(hs.id) AS homeservers_list
RETURN homeservers_list",
"get_users_by_homeserver",
"MATCH (u:User)-[:HOSTED_BY]->(:Homeserver {id: $hs_id})
WHERE u.name <> '[DELETED]'
RETURN collect(u.id) AS user_ids",
)
.param("hs_id", hs_id.to_string())
}

/// Retrieve tags for a user within the viewer's trusted network
Expand Down
26 changes: 26 additions & 0 deletions nexus-common/src/db/graph/queries/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,29 @@ pub fn create_homeserver(homeserver_id: &str) -> Query {
)
.param("id", homeserver_id)
}

/// Sets the `HOSTED_BY` relationship between a user and a homeserver.
///
/// If the user is already on the target homeserver, refreshes `resolved_at`.
/// If the user is on a different homeserver, replaces the old relationship.
/// MERGEs the target homeserver node if it doesn't exist yet.
pub fn set_user_homeserver(user_id: &str, homeserver_id: &str) -> Query {
Query::new(
"set_user_homeserver",
"MATCH (u:User {id: $user_id})

// Remove existing HOSTED_BY only if homeserver changed
OPTIONAL MATCH (u)-[old:HOSTED_BY]->(old_hs:Homeserver)
WHERE old_hs.id <> $hs_id
DELETE old

WITH u

// Ensure target homeserver and relationship exist
MERGE (hs:Homeserver {id: $hs_id})
MERGE (u)-[r:HOSTED_BY]->(hs)
SET r.resolved_at = timestamp()",
)
.param("user_id", user_id.to_string())
.param("hs_id", homeserver_id.to_string())
}
12 changes: 12 additions & 0 deletions nexus-common/src/db/kv/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ pub enum RedisError {
InvalidInput(String),
}

impl RedisError {
#[allow(clippy::match_like_matches_macro)]
pub fn is_infrastructure_err(&self) -> bool {
match self {
RedisError::ConnectionNotInitialized => true,
RedisError::ConnectionPoolError(_) => true,
RedisError::IoError(_) => true,
_ => false,
}
}
}

impl From<redis::RedisError> for RedisError {
fn from(e: redis::RedisError) -> Self {
if e.is_connection_refusal() || e.is_timeout() || e.is_io_error() {
Expand Down
Loading
Loading