Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions clickplanet-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ log = "0.4.22"
url = "2.5.4"
clap = { workspace = true, features = ["derive", "env"] }
tower-http = { version="0.6.2", features = ["cors", "trace"]}
http = "1.2.0"
http-body-util = "0.1.2"
hyper = "1.5.1"

[dev-dependencies]
testcontainers = { version = "0.23.1" }
Expand Down
76 changes: 57 additions & 19 deletions clickplanet-server/src/click_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ use axum::{
routing::get,
Router,
};
use axum::extract::WebSocketUpgrade;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Method, Request};
use axum::serve::Serve;
use http_body_util::Full;
use axum::body::{to_bytes, Body};
use axum::http::header::ACCEPT;
use axum::response::Response;

use bytes::Bytes;
use serde::{Deserialize, Serialize};
Expand All @@ -29,10 +37,7 @@ use base64::{encode};
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use std::{time::Duration};
use axum::extract::WebSocketUpgrade;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Method, Request};
use axum::serve::Serve;

use prost::Message;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
Expand All @@ -41,6 +46,8 @@ use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use clickplanet_proto::clicks::{Click, UpdateNotification};
use clickplanet_proto::clicks::{LeaderboardResponse, LeaderboardEntry};
use bytes::BytesMut;


use crate::click_persistence::{ClickRepository, LeaderboardRepository, LeaderboardOnClicks, LeaderboardMaintainer};
use crate::in_memory_click_persistence::{PapayaClickRepository};
Expand All @@ -49,6 +56,14 @@ use crate::ownership_service::OwnershipUpdateService;
use crate::redis_click_persistence::{RedisClickRepository};
use crate::telemetry::{init_telemetry, TelemetryConfig};



#[derive(Debug)]
enum AcceptedFormat {
Protobuf,
Json,
}

#[derive(Debug, Serialize, Deserialize)]
struct ClickPayload {
data: Vec<u8>,
Expand Down Expand Up @@ -140,12 +155,12 @@ async fn run(args: &Args) -> Result<(), Box<dyn std::error::Error>> {

let app = Router::new()
.route("/api/click", post(handle_click))
.route("/v2/rpc/click", post(handle_click))
.route("/api/ownerships-by-batch", post(handle_get_ownerships_by_batch))
.route("/ws/listen", get(handle_ws_upgrade))
.route("/v2/rpc/click", post(handle_click))
.route("/v2/rpc/ownerships-by-batch", post(handle_get_ownerships_by_batch))
.route("/v2/rpc/ownerships", get(handle_get_ownerships))
.route("/v2/rpc/leaderboard", get(handle_get_leaderboard))
.route("/ws/listen", get(handle_ws_upgrade))
.route("/v2/ws/listen", get(handle_ws_upgrade))
.layer(
CorsLayer::new()
Expand Down Expand Up @@ -345,7 +360,15 @@ async fn handle_ws_connection<T: ClickRepository>(socket: WebSocket, state: AppS

async fn handle_get_leaderboard<T: ClickRepository>(
State(state): State<AppState<T>>,
) -> Result<Json<Value>, StatusCode> {
headers: axum::http::HeaderMap,
) -> Result<Response<Full<Bytes>>, StatusCode> {
let format = match headers.get(ACCEPT) {
Some(accept) if accept.to_str().map(|s| s.contains("application/protobuf")).unwrap_or(false) => {
AcceptedFormat::Protobuf
},
_ => AcceptedFormat::Json
};

let leaderboard_data = tokio::time::timeout(
Duration::from_secs(5),
state.leaderboard_repo.leaderboard(),
Expand Down Expand Up @@ -376,16 +399,31 @@ async fn handle_get_leaderboard<T: ClickRepository>(
entries.sort_by(|a, b| b.score.cmp(&a.score));
response.entries = entries;

let mut response_bytes = Vec::new();
response
.encode(&mut response_bytes)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let base64_data = encode(&response_bytes);

let payload = json!({
"data": base64_data,
});

Ok(axum::Json(payload))
match format {
AcceptedFormat::Protobuf => {
let mut buf = BytesMut::new();
response.encode(&mut buf)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

Ok(Response::builder()
.header("Content-Type", "application/protobuf")
.body(Full::new(buf.freeze()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?)
},
AcceptedFormat::Json => {
let json_response = json!({
"entries": response.entries.iter().map(|entry| {
json!({
"country_id": entry.country_id,
"score": entry.score,
})
}).collect::<Vec<_>>()
});

Ok(Response::builder()
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(json_response.to_string())))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?)
}
}
}
2 changes: 1 addition & 1 deletion clickplanet-server/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn init_telemetry(config: TelemetryConfig) -> Result<(), Box<dyn std::

// Set up filter based on RUST_LOG env var, defaulting to info
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("debug"));
.unwrap_or_else(|_| EnvFilter::new("info"));

// Combine both layers
let subscriber = Registry::default()
Expand Down
22 changes: 17 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ services:
- JS_STORE_DIR=/data/jetstream
networks:
- app-network
deploy:
resources:
limits:
cpus: '2.0'
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8222/healthz"]
interval: 5s
Expand Down Expand Up @@ -54,17 +58,25 @@ services:
click-server:
image: clickplanet/click-server:latest
environment:
- RUST_LOG=debug
- RUST_LOG=info
command: [
"--nats-url", "nats://nats:4222",
"--redis-url", "redis://redis:6379",
"--otlp-endpoint", "jaeger:4317",
"--otlp-endpoint", "http://jaeger:4317",
"--service-name", "click-server"
]
ports:
- "3000:3000"
networks:
- app-network
deploy:
resources:
limits:
cpus: '4.0'
memory: '2G'
reservations:
memory: '2G'
cpus: '4.0'
depends_on:
redis:
condition: service_healthy
Expand All @@ -76,11 +88,11 @@ services:
state-click-persister:
image: clickplanet/state-click-persister:latest
environment:
- RUST_LOG=debug
- RUST_LOG=info
command: [
"--nats-url", "nats://nats:4222",
"--redis-url", "redis://redis:6379",
"--otlp-endpoint", "jaeger:4317",
"--otlp-endpoint", "http://jaeger:4317",
"--service-name", "click-persister",
"--concurrent-processors", "8",
"--ack-wait-secs", "10"
Expand All @@ -98,7 +110,7 @@ services:
tile-syncer:
image: clickplanet/tile-syncer:latest
environment:
- RUST_LOG=debug
- RUST_LOG=info
command: [
"--prod-host", "clickplanet.lol",
"--local-host", "click-server", # Use the service name as host
Expand Down