diff --git a/Cargo.lock b/Cargo.lock index 7e3eae4..14f8bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,9 @@ dependencies = [ "deadpool-redis", "futures", "futures-util", + "http 1.2.0", + "http-body-util", + "hyper", "log", "opentelemetry", "opentelemetry-otlp", diff --git a/clickplanet-server/Cargo.toml b/clickplanet-server/Cargo.toml index 961e894..82abf0f 100644 --- a/clickplanet-server/Cargo.toml +++ b/clickplanet-server/Cargo.toml @@ -31,6 +31,9 @@ log = "0.4.22" url = "2.5.4" clap = { workspace = true, features = ["derive", "env"] } tower-http = { version="0.6.2", features = ["cors", "trace"]} +http = "1.2.0" +http-body-util = "0.1.2" +hyper = "1.5.1" [dev-dependencies] testcontainers = { version = "0.23.1" } diff --git a/clickplanet-server/src/click_server.rs b/clickplanet-server/src/click_server.rs index 025da08..dbab6c4 100644 --- a/clickplanet-server/src/click_server.rs +++ b/clickplanet-server/src/click_server.rs @@ -17,6 +17,14 @@ use axum::{ routing::get, Router, }; +use axum::extract::WebSocketUpgrade; +use axum::http::header::CONTENT_TYPE; +use axum::http::{Method, Request}; +use axum::serve::Serve; +use http_body_util::Full; +use axum::body::{to_bytes, Body}; +use axum::http::header::ACCEPT; +use axum::response::Response; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -29,10 +37,7 @@ use base64::{encode}; use clap::Parser; use futures_util::{SinkExt, StreamExt}; use std::{time::Duration}; -use axum::extract::WebSocketUpgrade; -use axum::http::header::CONTENT_TYPE; -use axum::http::{Method, Request}; -use axum::serve::Serve; + use prost::Message; use tokio::sync::Mutex; use tokio::sync::broadcast; @@ -41,6 +46,8 @@ use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; use clickplanet_proto::clicks::{Click, UpdateNotification}; use clickplanet_proto::clicks::{LeaderboardResponse, LeaderboardEntry}; +use bytes::BytesMut; + use crate::click_persistence::{ClickRepository, LeaderboardRepository, LeaderboardOnClicks, LeaderboardMaintainer}; use crate::in_memory_click_persistence::{PapayaClickRepository}; @@ -49,6 +56,14 @@ use crate::ownership_service::OwnershipUpdateService; use crate::redis_click_persistence::{RedisClickRepository}; use crate::telemetry::{init_telemetry, TelemetryConfig}; + + +#[derive(Debug)] +enum AcceptedFormat { + Protobuf, + Json, +} + #[derive(Debug, Serialize, Deserialize)] struct ClickPayload { data: Vec, @@ -140,12 +155,12 @@ async fn run(args: &Args) -> Result<(), Box> { let app = Router::new() .route("/api/click", post(handle_click)) - .route("/v2/rpc/click", post(handle_click)) .route("/api/ownerships-by-batch", post(handle_get_ownerships_by_batch)) + .route("/ws/listen", get(handle_ws_upgrade)) + .route("/v2/rpc/click", post(handle_click)) .route("/v2/rpc/ownerships-by-batch", post(handle_get_ownerships_by_batch)) .route("/v2/rpc/ownerships", get(handle_get_ownerships)) .route("/v2/rpc/leaderboard", get(handle_get_leaderboard)) - .route("/ws/listen", get(handle_ws_upgrade)) .route("/v2/ws/listen", get(handle_ws_upgrade)) .layer( CorsLayer::new() @@ -345,7 +360,15 @@ async fn handle_ws_connection(socket: WebSocket, state: AppS async fn handle_get_leaderboard( State(state): State>, -) -> Result, StatusCode> { + headers: axum::http::HeaderMap, +) -> Result>, StatusCode> { + let format = match headers.get(ACCEPT) { + Some(accept) if accept.to_str().map(|s| s.contains("application/protobuf")).unwrap_or(false) => { + AcceptedFormat::Protobuf + }, + _ => AcceptedFormat::Json + }; + let leaderboard_data = tokio::time::timeout( Duration::from_secs(5), state.leaderboard_repo.leaderboard(), @@ -376,16 +399,31 @@ async fn handle_get_leaderboard( entries.sort_by(|a, b| b.score.cmp(&a.score)); response.entries = entries; - let mut response_bytes = Vec::new(); - response - .encode(&mut response_bytes) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let base64_data = encode(&response_bytes); - - let payload = json!({ - "data": base64_data, - }); - - Ok(axum::Json(payload)) + match format { + AcceptedFormat::Protobuf => { + let mut buf = BytesMut::new(); + response.encode(&mut buf) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(Response::builder() + .header("Content-Type", "application/protobuf") + .body(Full::new(buf.freeze())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?) + }, + AcceptedFormat::Json => { + let json_response = json!({ + "entries": response.entries.iter().map(|entry| { + json!({ + "country_id": entry.country_id, + "score": entry.score, + }) + }).collect::>() + }); + + Ok(Response::builder() + .header("Content-Type", "application/json") + .body(Full::new(Bytes::from(json_response.to_string()))) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?) + } + } } \ No newline at end of file diff --git a/clickplanet-server/src/telemetry.rs b/clickplanet-server/src/telemetry.rs index a4f880a..8f10717 100644 --- a/clickplanet-server/src/telemetry.rs +++ b/clickplanet-server/src/telemetry.rs @@ -49,7 +49,7 @@ pub async fn init_telemetry(config: TelemetryConfig) -> Result<(), Box