Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 216 additions & 18 deletions nexus-common/src/models/homeserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,63 @@ use crate::models::user::UserDetails;
use pubky::PublicKey;
use pubky_app_specs::ParsedUri;
use pubky_app_specs::PubkyId;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use tracing::info;

/// Deserializes cursor from either a JSON string or number.
///
/// This handles backwards compatibility with old data where cursor was stored as a string
/// (e.g., `"0000000000000"`), while also supporting the new numeric format.
fn deserialize_cursor<'de, D>(deserializer: D) -> Result<u64, D::Error>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered migrating data in redis instead of supporting both formats ?

where
D: Deserializer<'de>,
{
use serde::de::Error;

#[derive(Deserialize)]
#[serde(untagged)]
enum CursorValue {
Number(u64),
String(String),
}

match CursorValue::deserialize(deserializer)? {
CursorValue::Number(n) => Ok(n),
CursorValue::String(s) => s
.parse()
.map_err(|_| D::Error::custom(format!("Cannot parse cursor string '{s}' as u64"))),
}
}

/// Represents a homeserver with its public key, URL, and cursor.
#[derive(Serialize, Deserialize, Debug)]
pub struct Homeserver {
pub id: PubkyId,

// We persist this field only in the cache, but not in the graph.
// Redis has regular snapshots, which ensures we get a recent state in case of RAM data loss (system crash).
pub cursor: String,
#[serde(deserialize_with = "deserialize_cursor")]
pub cursor: u64,
}

impl RedisOps for Homeserver {}

impl Homeserver {
/// Instantiates a new homeserver with default cursor
pub fn new(id: PubkyId) -> Self {
Homeserver {
id,
cursor: "0000000000000".to_string(),
}
Homeserver { id, cursor: 0 }
}

/// Creates a new homeserver instance with the specified cursor
pub fn try_from_cursor<T: Into<String>>(id: PubkyId, cursor: T) -> ModelResult<Self> {
let cursor = cursor.into();
if cursor.is_empty() {
return Err(ModelError::from_generic(
"Cannot create a homeserver from an empty cursor",
));
}
pub async fn try_from_cursor<T: Into<String>>(id: PubkyId, cursor: T) -> ModelResult<Self> {
Comment thread
aintnostressin marked this conversation as resolved.
let cursor_str = cursor.into();
let cursor = cursor_str.parse().map_err(|_| {
ModelError::from_generic(format!(
"Cannot create a HS from a non-numeric cursor: {cursor_str}"
))
})?;

Self::validate_cursor_change(&id, cursor).await?;

Ok(Homeserver { id, cursor })
}
Expand Down Expand Up @@ -74,11 +99,8 @@ impl Homeserver {

/// Stores this homeserver in Redis.
pub async fn put_to_index(&self) -> RedisResult<()> {
if self.cursor.is_empty() {
return Err(RedisError::InvalidInput(
"Cannot save to index a homeserver with an empty cursor".into(),
));
}
Self::validate_cursor_change(&self.id, self.cursor).await?;

self.put_index_json(&[&self.id], None, None).await
}

Expand All @@ -87,6 +109,8 @@ impl Homeserver {
Some(hs) => Ok(Some(hs)),
None => match Self::get_from_graph(&homeserver_id).await? {
Some(hs_from_graph) => {
// This assumes the index and the graph are in-sync
// If they are not (e.g. Redis lost a HS entry but graph still has it), put_to_index will persist with cursor = 0
hs_from_graph.put_to_index().await?;
Ok(Some(hs_from_graph))
}
Expand All @@ -95,6 +119,19 @@ impl Homeserver {
}
}

async fn validate_cursor_change(id: &str, new_cursor: u64) -> RedisResult<()> {
// If we already indexed a value, reject cursors going below it to prevent reindexing past events
if let Some(hs_from_index) = Self::get_from_index(id).await? {
if new_cursor < hs_from_index.cursor {
return Err(RedisError::InvalidInput(
"Cursor cannot move backwards".into(),
));
}
}

Ok(())
}

/// Verifies if homeserver exists in the graph, or persists it if missing
pub async fn persist_if_unknown(homeserver_id: PubkyId) -> ModelResult<()> {
if Self::get_from_graph(&homeserver_id).await?.is_none() {
Expand Down Expand Up @@ -221,4 +258,165 @@ mod tests {

Ok(())
}

#[tokio_shared_rt::test(shared)]
async fn test_cursor_forwards_accepted() -> Result<(), DynError> {
StackManager::setup("unit-hs-test", &StackConfig::default()).await?;

let keys = Keypair::random();
let id = PubkyId::try_from(&keys.public_key().to_z32())?;

// Store cursor at 100
let hs = Homeserver {
id: id.clone(),
cursor: 100,
};
hs.put_to_index()
.await
.expect("Failed to put initial cursor");

// Moving cursor forward to 200 must succeed
let hs2 = Homeserver {
id: id.clone(),
cursor: 200,
};
hs2.put_to_index()
.await
.expect("Forward cursor update should be accepted");

let stored = Homeserver::get_from_index(&id)
.await
.unwrap()
.expect("Homeserver not found in index");
assert_eq!(stored.cursor, 200);

Ok(())
}

#[tokio_shared_rt::test(shared)]
async fn test_cursor_backwards_rejected_by_put_to_index() -> Result<(), DynError> {
StackManager::setup("unit-hs-test", &StackConfig::default()).await?;

let keys = Keypair::random();
let id = PubkyId::try_from(&keys.public_key().to_z32())?;

// Store cursor at 500
let hs = Homeserver {
id: id.clone(),
cursor: 500,
};
hs.put_to_index()
.await
.expect("Failed to put initial cursor");

// Attempting to move cursor backwards to 100 must be rejected
let hs2 = Homeserver {
id: id.clone(),
cursor: 100,
};
let result = hs2.put_to_index().await;
assert!(
result.is_err(),
"Backwards cursor update should be rejected"
);

// The stored cursor must remain at 500
let stored = Homeserver::get_from_index(&id)
.await
.unwrap()
.expect("Homeserver not found in index");
assert_eq!(stored.cursor, 500);

Ok(())
}

#[tokio_shared_rt::test(shared)]
async fn test_cursor_backwards_rejected_by_try_from_cursor() -> Result<(), DynError> {
StackManager::setup("unit-hs-test", &StackConfig::default()).await?;

let keys = Keypair::random();
let id = PubkyId::try_from(&keys.public_key().to_z32())?;

// Store cursor at 300
let hs = Homeserver {
id: id.clone(),
cursor: 300,
};
hs.put_to_index()
.await
.expect("Failed to put initial cursor");

// try_from_cursor with a lower value must fail
let result = Homeserver::try_from_cursor(id.clone(), "50").await;
assert!(
result.is_err(),
"try_from_cursor with backwards cursor should be rejected"
);

// try_from_cursor with a higher value must succeed
let result = Homeserver::try_from_cursor(id.clone(), "400").await;
assert!(
result.is_ok(),
"try_from_cursor with forward cursor should be accepted"
);
assert_eq!(result.unwrap().cursor, 400);

Ok(())
}

#[tokio_shared_rt::test(shared)]
async fn test_cursor_equal_value_accepted() -> Result<(), DynError> {
StackManager::setup("unit-hs-test", &StackConfig::default()).await?;

let keys = Keypair::random();
let id = PubkyId::try_from(&keys.public_key().to_z32())?;

// Store cursor at 200
let hs = Homeserver {
id: id.clone(),
cursor: 200,
};
hs.put_to_index()
.await
.expect("Failed to put initial cursor");

// Writing the same cursor value (not backwards) must succeed
let hs2 = Homeserver {
id: id.clone(),
cursor: 200,
};
hs2.put_to_index()
.await
.expect("Same cursor value should be accepted");

Ok(())
}

#[test]
fn test_deserialize_cursor_from_string() {
// Simulates old data format where cursor was stored as a string
let json = r#"{"id":"o1gg96ewuojmopc9qcp6j3kk5rn1b81ks6hisk7jitpptgeo3dty","cursor":"0000000000000"}"#;
let hs: Homeserver = serde_json::from_str(json).expect("Failed to deserialize");
assert_eq!(hs.cursor, 0);

// Also test with a non-zero string cursor
let json =
r#"{"id":"o1gg96ewuojmopc9qcp6j3kk5rn1b81ks6hisk7jitpptgeo3dty","cursor":"12345"}"#;
let hs: Homeserver = serde_json::from_str(json).expect("Failed to deserialize");
assert_eq!(hs.cursor, 12345);
}

#[test]
fn test_deserialize_cursor_from_number() {
// Current format where cursor is stored as a number
let json = r#"{"id":"o1gg96ewuojmopc9qcp6j3kk5rn1b81ks6hisk7jitpptgeo3dty","cursor":0}"#;
let hs: Homeserver = serde_json::from_str(json).expect("Failed to deserialize");
assert_eq!(hs.cursor, 0);

// Also test with a non-zero numeric cursor
let json =
r#"{"id":"o1gg96ewuojmopc9qcp6j3kk5rn1b81ks6hisk7jitpptgeo3dty","cursor":98765}"#;
let hs: Homeserver = serde_json::from_str(json).expect("Failed to deserialize");
assert_eq!(hs.cursor, 98765);
}
}
2 changes: 1 addition & 1 deletion nexus-watcher/src/service/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl EventProcessor {

if let Some(cursor) = line.strip_prefix("cursor: ") {
info!("Received cursor for the next request: {cursor}");
match Homeserver::try_from_cursor(id, cursor) {
match Homeserver::try_from_cursor(id, cursor).await {
Ok(hs) => hs.put_to_index().await?,
Err(e) => warn!("{e}"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is going to fail each time, unless homeserver somehow returns cursor that is higher that currently stored.

Should we have a metric and get notified that syncing is stalled because of it ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the HS returns event lines, the last one will have a cursor that is higher than given cursor. For example https://homeserver.pubky.app/events/?cursor=150000&limit=1

If the given cursor doesn't exist, the HS returns no lines : https://homeserver.pubky.app/events/?cursor=250000&limit=1

this is going to fail each time, unless homeserver somehow returns cursor that is higher that currently stored

So it seems to me the condition you describe won't happen, because the HS always returns a higher cursor than the current one (arg), else returns no lines at all.

Or am I missing smth?

}
Expand Down
44 changes: 1 addition & 43 deletions nexus-watcher/tests/event_processor/posts/raw.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use super::utils::find_post_details;
use super::utils::{
check_member_global_timeline_user_post, check_member_user_post_timeline, find_post_counts,
Expand All @@ -8,10 +6,9 @@ use crate::event_processor::users::utils::{check_member_user_influencer, find_us
use crate::event_processor::utils::watcher::WatcherTest;
use anyhow::Result;
use nexus_common::models::event::Event;
use nexus_common::models::homeserver::Homeserver;
use nexus_common::models::post::{PostCounts, PostDetails};
use pubky::Keypair;
use pubky_app_specs::{PubkyAppPost, PubkyAppPostKind, PubkyAppUser, PubkyId};
use pubky_app_specs::{PubkyAppPost, PubkyAppPostKind, PubkyAppUser};

#[tokio_shared_rt::test(shared)]
async fn test_homeserver_put_post_event() -> Result<()> {
Expand Down Expand Up @@ -67,15 +64,6 @@ async fn test_homeserver_put_post_event() -> Result<()> {
assert_eq!(post_details.uri, post_detail_cache.uri);
assert_eq!(post_details.indexed_at, post_detail_cache.indexed_at);

reindex_and_ensure_cache_and_graph_unchanged(
&mut test,
&post_details,
&post_detail_cache,
&user_id,
&post_id,
)
.await?;

// User:Counts:user_id:post_id
let post_counts: PostCounts = find_post_counts(&user_id, &post_id).await;
assert_eq!(post_counts.reposts, 0);
Expand Down Expand Up @@ -117,33 +105,3 @@ async fn test_homeserver_put_post_event() -> Result<()> {

Ok(())
}

async fn reindex_and_ensure_cache_and_graph_unchanged(
test: &mut WatcherTest,
post_details_from_graph: &PostDetails,
post_detail_from_cache: &PostDetails,
user_id: &str,
post_id: &str,
) -> Result<()> {
// Wait for a few ms, so that re-indexing determines a different indexed_at (epoch timestamp in ms)
tokio::time::sleep(Duration::from_millis(10)).await;

// Reset the cursor, to ensure the events are re-indexed
let homeserver = Homeserver::new(PubkyId::try_from(&test.homeserver_id).unwrap());
homeserver.put_to_graph().await.unwrap();
homeserver.put_to_index().await.unwrap();
test.ensure_event_processing_complete().await?;

// Check that nothing changed in the graph (DB)
let post_details_2 = find_post_details(user_id, post_id).await.unwrap();
assert_eq!(post_details_from_graph, &post_details_2);

// Check that nothing changed in the index (cache)
let post_detail_cache_2: PostDetails = PostDetails::get_from_index(user_id, post_id)
.await
.unwrap()
.expect("The new post detail was not served from Nexus cache");
assert_eq!(post_detail_from_cache, &post_detail_cache_2);

Ok(())
}
Loading