Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chai-api"
version = "1.2.1"
version = "1.3.0"
edition = "2021"
authors = ["Jacob Heider <jacob@pkgx.dev>"]
description = "A simple REST API for the CHAI database"
Expand All @@ -25,3 +25,4 @@ tokio-postgres = { version = "0.7", features = [
] }
deadpool-postgres = "0.10.0"
url = "2.5.2"
dashmap = "6.1.0"
26 changes: 26 additions & 0 deletions api/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
use dashmap::DashMap;
use deadpool_postgres::Pool;
use serde_json::Value;
use std::sync::Arc;
use std::time::{Duration, Instant};
use uuid::Uuid;

const TTL: Duration = Duration::from_secs(3600); // 1 hour

#[derive(Clone)]
pub struct ProjectCacheEntry {
pub data: Arc<Value>,
pub created_at: Instant,
}

impl ProjectCacheEntry {
pub fn new(data: Value) -> Self {
Self {
data: Arc::new(data),
created_at: Instant::now(),
}
}

pub fn is_expired(&self) -> bool {
self.created_at.elapsed() > TTL
}
}

pub struct AppState {
pub pool: Pool,
pub tables: Arc<Vec<String>>,
pub project_cache: Arc<DashMap<Uuid, ProjectCacheEntry>>,
}
80 changes: 73 additions & 7 deletions api/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use actix_web::{get, post, web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio_postgres::error::SqlState;
use uuid::Uuid;

use crate::app_state::AppState;
use crate::utils::{get_column_names, rows_to_json, Pagination};
use crate::utils::{get_cached_projects, get_column_names, rows_to_json, Pagination};

const RESPONSE_LIMIT: i64 = 1000;

Expand Down Expand Up @@ -317,7 +318,7 @@ pub async fn list_projects_by_id(
WHERE cp2.canon_id = c.id
) AS "packageManagers"
FROM canons c
JOIN urls u_homepage ON u_homepage.id = c.url_id
JOIN urls u_homepage ON u_homepage.id = c.url_id
JOIN canon_packages cp ON cp.canon_id = c.id
JOIN package_urls pu ON pu.package_id = cp.package_id
JOIN urls u_source ON pu.url_id = u_source.id
Expand Down Expand Up @@ -415,8 +416,10 @@ pub async fn get_leaderboard(
req: web::Json<LeaderboardRequest>,
data: web::Data<AppState>,
) -> impl Responder {
let limit = req.limit.clamp(1, RESPONSE_LIMIT);

let Some(project_ids) = req.project_ids.as_deref() else {
return get_top_projects(data, req.limit).await;
return get_top_projects(data, limit).await;
};

if project_ids.len() > RESPONSE_LIMIT as usize {
Expand All @@ -425,8 +428,16 @@ pub async fn get_leaderboard(
}));
}

let limit = req.limit.clamp(1, RESPONSE_LIMIT);
// Get cached projects and identify missing ones
let (cached_projects, missing_ids) =
get_cached_projects(data.project_cache.clone(), project_ids);

// If we have all projects cached, return them sorted
if missing_ids.is_empty() {
return sort_truncate_and_return(cached_projects, limit);
}

// Query for missing projects
let query = r#"
SELECT *
FROM (
Expand Down Expand Up @@ -462,10 +473,35 @@ pub async fn get_leaderboard(
LIMIT $2"#;

match data.pool.get().await {
Ok(client) => match client.query(query, &[&project_ids, &limit]).await {
Ok(client) => match client.query(query, &[&missing_ids, &limit]).await {
Ok(rows) => {
let json = rows_to_json(&rows);
HttpResponse::Ok().json(json)
let fresh_projects = rows_to_json(&rows);

// Cache the fresh projects
for project in &fresh_projects {
if let Some(project_id) = project.get("projectId").and_then(|v| v.as_str()) {
if let Ok(uuid) = Uuid::parse_str(project_id) {
data.project_cache.insert(
uuid,
crate::app_state::ProjectCacheEntry::new(project.clone()),
);
} else {
log::warn!("Failed to parse project ID as UUID: {}", project_id);
}
} else {
log::warn!("No projectId found in project: {:?}", project);
}
}

// Combine cached and fresh projects - keep Arc<Value> for cached ones
let mut all_projects: Vec<Arc<Value>> = cached_projects;

// Convert fresh projects to Arc<Value> to match the type
let fresh_arcs: Vec<Arc<Value>> =
fresh_projects.into_iter().map(Arc::new).collect();
all_projects.extend(fresh_arcs);

sort_truncate_and_return(all_projects, limit)
}
Err(e) => {
log::error!("Database query error: {e}");
Expand All @@ -481,6 +517,36 @@ pub async fn get_leaderboard(
}
}

// Helper function to sort, truncate, and return the final response
fn sort_truncate_and_return(projects: Vec<Arc<Value>>, limit: i64) -> actix_web::HttpResponse {
let mut projects = projects;

// Sort projects by teaRank (descending) - Arc<Value> derefs to Value
projects.sort_by(|a, b| {
let rank_a = a
.get("teaRank")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
let rank_b = b
.get("teaRank")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
rank_b.cmp(&rank_a)
});

// Apply limit
projects.truncate(limit as usize);

// Convert to Vec<Value> only for the final response - Arc<Value> doesn't implement Serialize
let final_projects: Vec<Value> = projects
.into_iter()
.map(|arc_val| (*arc_val).clone())
.collect();
actix_web::HttpResponse::Ok().json(final_projects)
}

async fn get_top_projects(data: web::Data<AppState>, limit: i64) -> HttpResponse {
// get client
let Ok(client) = data.pool.get().await else {
Expand Down
4 changes: 4 additions & 0 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod logging;
mod utils;

use actix_web::{web, App, HttpServer};
use dashmap::DashMap;
use dotenv::dotenv;
use std::env;
use std::sync::Arc;
Expand All @@ -26,6 +27,8 @@ async fn main() -> std::io::Result<()> {
let bind_address = format!("{host}:{port}");

let (pool, tables) = db::initialize_db().await;
// Cache for project data to reduce database load on leaderboard routes
let project_cache = Arc::new(DashMap::new());

log::info!("Available tables: {tables:?}");
log::info!("Starting server at http://{bind_address}");
Expand All @@ -36,6 +39,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::Data::new(AppState {
pool: pool.clone(),
tables: Arc::clone(&tables),
project_cache: Arc::clone(&project_cache),
}))
// HEALTH
.service(heartbeat)
Expand Down
25 changes: 24 additions & 1 deletion api/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use actix_web::web::Query;
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use dashmap::DashMap;
use serde_json::{json, Value};
use std::sync::Arc;
use tokio_postgres::{types::Type, Row};
use uuid::Uuid;

use crate::handlers::PaginationParams;
use crate::{app_state::ProjectCacheEntry, handlers::PaginationParams};

pub fn get_column_names(rows: &[Row]) -> Vec<String> {
if let Some(row) = rows.first() {
Expand Down Expand Up @@ -91,3 +93,24 @@ impl Pagination {
}
}
}

// Helper function to get cached projects and return missing ones
pub fn get_cached_projects(
cache: Arc<DashMap<Uuid, ProjectCacheEntry>>,
project_ids: &[Uuid],
) -> (Vec<Arc<Value>>, Vec<Uuid>) {
let mut cached_projects = Vec::new();
let mut missing_ids = Vec::new();

for &project_id in project_ids {
if let Some(entry) = cache.get(&project_id) {
if !entry.is_expired() {
cached_projects.push(entry.data.clone());
continue;
}
}
missing_ids.push(project_id);
}

(cached_projects, missing_ids)
}
Loading