Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nexus-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ license = "MIT"
async-trait = { workspace = true }
dirs = "6.0.0"
chrono = { workspace = true }
blake3 = "1.5"
url = "2.5"
neo4rs = { workspace = true }
once_cell = "1.21.3"
opentelemetry = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion nexus-common/src/db/graph/queries/del.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ pub fn delete_tag(user_id: &str, tag_id: &str) -> Query {
WITH CASE WHEN target:User THEN target.id ELSE null END AS user_id,
CASE WHEN target:Post THEN target.id ELSE null END AS post_id,
CASE WHEN target:Post THEN author.id ELSE null END AS author_id,
CASE WHEN target:ExternalLink THEN target.id ELSE null END AS external_link_id,
tag.label AS label,
tag
DELETE tag
RETURN user_id, post_id, author_id, label",
RETURN user_id, post_id, author_id, external_link_id, label",
)
.param("user_id", user_id)
.param("tag_id", tag_id)
Expand Down
22 changes: 22 additions & 0 deletions nexus-common/src/db/graph/queries/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,28 @@ pub fn user_tags(user_id: &str) -> neo4rs::Query {
.param("user_id", user_id)
}

pub fn external_link_tags(link_id: &str) -> neo4rs::Query {
query(
"
MATCH (link:ExternalLink {id: $link_id})
CALL {
WITH link
MATCH (tagger:User)-[tag:TAGGED]->(link)
WITH tag.label AS name, collect(DISTINCT tagger.id) AS tagger_ids
RETURN collect({
label: name,
taggers: tagger_ids,
taggers_count: SIZE(tagger_ids)
}) AS tags
}
RETURN
link IS NOT NULL AS exists,
tags
",
)
.param("link_id", link_id)
}

/// Retrieve a homeserver by ID
pub fn get_homeserver_by_id(id: &str) -> Query {
query(
Expand Down
60 changes: 51 additions & 9 deletions nexus-common/src/db/graph/queries/put.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::models::post::PostRelationships;
use crate::models::{file::FileDetails, post::PostDetails, user::UserDetails};
use crate::models::{
file::FileDetails, link::ExternalLinkDetails, post::PostDetails, user::UserDetails,
};
use crate::types::DynError;
use neo4rs::{query, Query};
use pubky_app_specs::{ParsedUri, Resource};
Expand Down Expand Up @@ -271,14 +273,14 @@ pub fn create_user_tag(
) -> Query {
query(
"MATCH (tagged_used:User {id: $tagged_user_id})
MATCH (tagger:User {id: $tagger_user_id})
// Check if tag already existed
OPTIONAL MATCH (tagger)-[existing:TAGGED {label: $label}]->(tagged_used)
MERGE (tagger)-[t:TAGGED {label: $label}]->(tagged_used)
SET t.indexed_at = $indexed_at,
t.id = $tag_id
// Returns true if the user tag relationship already existed
RETURN existing IS NOT NULL AS flag;",
MATCH (tagger:User {id: $tagger_user_id})
// Check if tag already existed
OPTIONAL MATCH (tagger)-[existing:TAGGED {label: $label}]->(tagged_used)
MERGE (tagger)-[t:TAGGED {label: $label}]->(tagged_used)
SET t.indexed_at = $indexed_at,
t.id = $tag_id
// Returns true if the user tag relationship already existed
RETURN existing IS NOT NULL AS flag;",
)
.param("tagger_user_id", tagger_user_id)
.param("tagged_user_id", tagged_user_id)
Expand All @@ -287,6 +289,46 @@ pub fn create_user_tag(
.param("indexed_at", indexed_at)
}

pub fn create_external_link_tag(
tagger_user_id: &str,
link: &ExternalLinkDetails,
tag_id: &str,
label: &str,
indexed_at: i64,
) -> Query {
query(
"MATCH (tagger:User {id: $tagger_user_id})
MERGE (link:ExternalLink {id: $link_id})
ON CREATE SET
link.url = $original_url,
link.normalized_url = $normalized_url,
link.scheme = $scheme,
link.host = $host,
link.created_at = $created_at
ON MATCH SET
link.url = coalesce(link.url, $original_url),
link.normalized_url = coalesce(link.normalized_url, $normalized_url),
link.scheme = coalesce(link.scheme, $scheme),
link.host = coalesce(link.host, $host),
link.created_at = coalesce(link.created_at, $created_at)
OPTIONAL MATCH (tagger)-[existing:TAGGED {label: $label}]->(link)
MERGE (tagger)-[tag:TAGGED {label: $label}]->(link)
SET tag.indexed_at = $indexed_at,
tag.id = $tag_id
RETURN existing IS NOT NULL AS flag;",
)
.param("tagger_user_id", tagger_user_id)
.param("link_id", link.id.as_str())
.param("original_url", link.original_url.as_str())
.param("normalized_url", link.normalized_url.as_str())
.param("scheme", link.scheme.as_str())
.param("host", link.host.clone())
.param("created_at", link.created_at)
.param("tag_id", tag_id)
.param("label", label)
.param("indexed_at", indexed_at)
}

/// Create a file node
pub fn create_file(file: &FileDetails) -> Result<Query, DynError> {
let urls = serde_json::to_string(&file.urls)?;
Expand Down
2 changes: 2 additions & 0 deletions nexus-common/src/db/graph/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub async fn setup_graph() -> Result<(), DynError> {
"CREATE CONSTRAINT uniquePostId IF NOT EXISTS FOR (p:Post) REQUIRE p.id IS UNIQUE",
"CREATE CONSTRAINT uniqueFileId IF NOT EXISTS FOR (f:File) REQUIRE (f.owner_id, f.id) IS UNIQUE",
"CREATE CONSTRAINT uniqueHomeserverId IF NOT EXISTS FOR (hs:Homeserver) REQUIRE hs.id IS UNIQUE",
"CREATE CONSTRAINT uniqueExternalLinkId IF NOT EXISTS FOR (l:ExternalLink) REQUIRE l.id IS UNIQUE",
];

// Create indexes
Expand All @@ -22,6 +23,7 @@ pub async fn setup_graph() -> Result<(), DynError> {
"CREATE INDEX taggedTimestampIndex IF NOT EXISTS FOR ()-[r:TAGGED]-() ON (r.indexed_at)",
"CREATE INDEX fileIdIndex IF NOT EXISTS FOR (f:File) ON (f.owner_id, f.id)",
"CREATE INDEX homeserverIdIndex IF NOT EXISTS FOR (hs:Homeserver) ON (hs.id)",
"CREATE INDEX externalLinkNormalizedUrlIndex IF NOT EXISTS FOR (l:ExternalLink) ON (l.normalized_url)",
];

let queries = constraints.iter().chain(indexes.iter());
Expand Down
33 changes: 33 additions & 0 deletions nexus-common/src/db/reindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::db::graph::exec::fetch_all_rows_from_graph;
use crate::models::follow::{Followers, Following, UserFollows};
use crate::models::post::search::PostsByTagSearch;
use crate::models::post::Bookmark;
use crate::models::tag::external::TagExternal;
use crate::models::tag::post::TagPost;
use crate::models::tag::search::TagSearch;
use crate::models::tag::stream::HotTags;
Expand Down Expand Up @@ -47,6 +48,18 @@ pub async fn sync() {
});
}

let external_link_ids = get_all_external_link_ids()
.await
.expect("Failed to get external link IDs");
let mut external_tasks = JoinSet::new();
for link_id in external_link_ids {
external_tasks.spawn(async move {
if let Err(e) = <TagExternal as TagCollection>::reindex(&link_id, None).await {
tracing::error!("Failed to reindex external link {}: {:?}", link_id, e);
}
});
}

while let Some(res) = user_tasks.join_next().await {
if let Err(e) = res {
tracing::error!("User reindexing task failed: {:?}", e);
Expand All @@ -59,6 +72,12 @@ pub async fn sync() {
}
}

while let Some(res) = external_tasks.join_next().await {
if let Err(e) = res {
tracing::error!("External link reindexing task failed: {:?}", e);
}
}

HotTags::reindex()
.await
.expect("Failed to store the global hot tags");
Expand Down Expand Up @@ -128,3 +147,17 @@ async fn get_all_post_ids() -> Result<Vec<(String, String)>, DynError> {

Ok(post_ids)
}

async fn get_all_external_link_ids() -> Result<Vec<String>, DynError> {
let query = query("MATCH (link:ExternalLink) RETURN link.id AS link_id");
let rows = fetch_all_rows_from_graph(query).await?;

let mut link_ids = Vec::new();
for row in rows {
if let Some(link_id) = row.get("link_id")? {
link_ids.push(link_id);
}
}

Ok(link_ids)
}
147 changes: 147 additions & 0 deletions nexus-common/src/models/link/external.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::db::RedisOps;
use crate::types::DynError;
use blake3::Hash;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use url::Url;

const EXTERNAL_LINK_KEY_PREFIX: &str = "ExternalLink";
const EXTERNAL_LINK_HASH_LEN: usize = 32; // 128 bits represented as hex

#[derive(Debug, Error)]
pub enum ExternalLinkError {
#[error("failed to parse url: {0}")]
Parse(#[from] url::ParseError),
#[error("failed to normalize url scheme")]
Scheme,
#[error("failed to normalize url host: {0}")]
Host(url::ParseError),
#[error("failed to normalize url port")]
Port,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExternalLinkDetails {
pub id: String,
pub original_url: String,
pub normalized_url: String,
pub scheme: String,
pub host: Option<String>,
pub created_at: i64,
}

impl ExternalLinkDetails {
pub fn from_url(raw_url: &str, created_at: i64) -> Result<Self, ExternalLinkError> {
let normalized = NormalizedUrl::new(raw_url)?;
let id = hash_to_hex(&normalized.normalized_url);

Ok(Self {
id,
original_url: raw_url.to_string(),
normalized_url: normalized.normalized_url,
scheme: normalized.scheme,
host: normalized.host,
created_at,
})
}

pub async fn upsert(&self) -> Result<(), DynError> {
self.put_index_json(&[self.id.as_str()], None, None).await
}

pub async fn get(id: &str) -> Result<Option<Self>, DynError> {
Self::try_from_index_json(&[id], None).await
}
}

#[async_trait::async_trait]
impl RedisOps for ExternalLinkDetails {
async fn prefix() -> String {
EXTERNAL_LINK_KEY_PREFIX.to_string()
}
}

struct NormalizedUrl {
normalized_url: String,
scheme: String,
host: Option<String>,
}

impl NormalizedUrl {
fn new(raw_url: &str) -> Result<Self, ExternalLinkError> {
let mut url = Url::parse(raw_url)?;

url.set_fragment(None);

let lower_scheme = url.scheme().to_ascii_lowercase();
if lower_scheme != url.scheme() {
url.set_scheme(&lower_scheme)
.map_err(|_| ExternalLinkError::Scheme)?;
}

if let Some(port) = url.port() {
if Some(port) == url.port_or_known_default() {
url.set_port(None).map_err(|_| ExternalLinkError::Port)?;
}
}

if let Some(host) = url.host_str() {
let lower = host.to_ascii_lowercase();
if lower != host {
url.set_host(Some(&lower))
.map_err(ExternalLinkError::Host)?;
}
}

let host = url.host_str().map(|value| value.to_string());
let scheme = url.scheme().to_string();
let normalized_url = url.to_string();

Ok(Self {
normalized_url,
scheme,
host,
})
}
}

fn hash_to_hex(value: &str) -> String {
let hash: Hash = blake3::hash(value.as_bytes());
let mut hex = hash.to_hex().to_string();
if hex.len() > EXTERNAL_LINK_HASH_LEN {
hex.truncate(EXTERNAL_LINK_HASH_LEN);
}
hex
}

#[cfg(test)]
mod tests {
use super::ExternalLinkDetails;

#[test]
fn normalizes_scheme_host_and_fragment() {
let details =
ExternalLinkDetails::from_url("HTTP://Example.com:80/path#section", 1_000).unwrap();

assert_eq!(details.scheme, "http");
assert_eq!(details.host.as_deref(), Some("example.com"));
assert_eq!(details.normalized_url, "http://example.com/path");
assert_eq!(details.original_url, "HTTP://Example.com:80/path#section");
assert_eq!(details.created_at, 1_000);
}

#[test]
fn keeps_non_default_ports() {
let details = ExternalLinkDetails::from_url("https://example.com:8443/", 0).unwrap();

assert_eq!(details.normalized_url, "https://example.com:8443/");
}

#[test]
fn normalizes_ipv6_hosts() {
let details = ExternalLinkDetails::from_url("http://[2001:DB8::1]:80/", 0).unwrap();

assert_eq!(details.host.as_deref(), Some("2001:db8::1"));
assert_eq!(details.normalized_url, "http://[2001:db8::1]/");
}
}
3 changes: 3 additions & 0 deletions nexus-common/src/models/link/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod external;

pub use external::{ExternalLinkDetails, ExternalLinkError};
1 change: 1 addition & 0 deletions nexus-common/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod bootstrap;
pub mod file;
pub mod follow;
pub mod homeserver;
pub mod link;
pub mod notification;
pub mod post;
pub mod tag;
Expand Down
Loading