From 83c2f6a060850090c65c358af2dc973aefa75d5a Mon Sep 17 00:00:00 2001 From: Luca Date: Fri, 20 Mar 2026 13:24:17 +0100 Subject: [PATCH 1/4] feat: add new /v1/error route --- src/batch_queue/mod.rs | 3 +- src/handler/collect.rs | 3 +- src/handler/error.rs | 85 ++++++++++++++++++++++++++++++++++++++++++ src/handler/mod.rs | 12 ++++-- src/handler/web.rs | 3 +- src/main.rs | 1 + src/tinybird.rs | 3 +- 7 files changed, 103 insertions(+), 7 deletions(-) create mode 100644 src/handler/error.rs diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 1a60c88..25b2d09 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -1108,10 +1108,11 @@ mod tests { hash: "hash".to_string(), error_hash: "error-hash".to_string(), count: 3, - data_entry_id: Uuid::new_v4(), + data_entry_id: Some(Uuid::new_v4()), session_id: None, identity_key: None, build_id: None, + context: None, created_at: Utc::now(), }, tracking: None, diff --git a/src/handler/collect.rs b/src/handler/collect.rs index 3cc35ff..b4c8004 100644 --- a/src/handler/collect.rs +++ b/src/handler/collect.rs @@ -127,9 +127,10 @@ pub async fn collect( if let Err(e) = insert_error_entries( &state.batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, identity_key, + None, Some(tracking_ctx.clone()), ) .await diff --git a/src/handler/error.rs b/src/handler/error.rs new file mode 100644 index 0000000..3518b2a --- /dev/null +++ b/src/handler/error.rs @@ -0,0 +1,85 @@ +use super::{ + check_ip_allowed, error_response, get_authorization, get_client_ip, insert_error_entries, + load_project_context, resolve_identity_key, success_response, +}; +use crate::batch_queue::TrackingContext; +use crate::models::{AppState, ErrorTracking}; +use axum::Json; +use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use serde::Deserialize; +use serde_json::Value; +use std::collections::HashMap; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct ErrorRequest { + errors: Vec, + #[serde(default)] + session_id: Option, + #[serde(default)] + build_id: Option, + #[serde(default)] + context: Option, +} + +pub async fn error( + State(state): State, + headers: HeaderMap, + Json(payload): Json, +) -> impl IntoResponse { + let token = match get_authorization(&headers) { + Some(t) => t, + None => return error_response(StatusCode::UNAUTHORIZED, "Unauthorized"), + }; + + let ctx = match load_project_context(&state.pool, &token).await { + Ok(ctx) => ctx, + Err(e) => return e, + }; + + let client_ip = get_client_ip(&headers); + if let Err(msg) = check_ip_allowed(&ctx.ip_rules, client_ip) { + return error_response(StatusCode::FORBIDDEN, msg); + } + + if !ctx.error_tracking_enabled { + return error_response(StatusCode::FORBIDDEN, "Error tracking is not enabled"); + } + + let tracking_ctx = TrackingContext { + owner_id: ctx.billing_customer_id.as_str().into(), + token: token.into(), + organization_id: ctx.organization_id.as_deref().map(Into::into), + }; + + let context = payload + .context + .map(|v| serde_json::to_string(&v).unwrap_or_else(|_| "{}".to_string())); + + for mut error in payload.errors { + if error.session_id.is_none() { + error.session_id = payload.session_id.clone(); + } + if error.build_id.is_none() { + error.build_id = payload.build_id.clone(); + } + let identity_key = resolve_identity_key(error.session_id.as_deref(), None); + if let Err(e) = insert_error_entries( + &state.batch_queue, + ctx.project_id, + None, + error, + identity_key, + context.clone(), + Some(tracking_ctx.clone()), + ) + .await + { + return e; + } + } + + success_response(HashMap::new()) +} diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 8f2ae5d..be9552a 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1,10 +1,12 @@ mod collect; +mod error; mod identify; mod replay; mod vitals; mod web; pub use collect::collect; +pub use error::error; pub use identify::identify; pub use replay::replay; pub use vitals::vitals; @@ -515,9 +517,10 @@ fn build_error_rows(error: &Error, errors: &mut Vec) -> String { pub async fn insert_error_entries( batch_queue: &BatchQueue, project_id: Uuid, - data_entry_id: Uuid, + data_entry_id: Option, data: ErrorTracking, identity_key: Option, + context: Option, tracking_ctx: Option, ) -> Result<(), HandlerResponse> { let mut error_rows = Vec::new(); @@ -545,6 +548,7 @@ pub async fn insert_error_entries( session_id: data.session_id.clone(), identity_key, build_id: data.build_id.clone(), + context, created_at, }; @@ -651,9 +655,10 @@ async fn process_collect_request( insert_error_entries( batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, identity_key, + None, Some(tracking_ctx.clone()), ) .await @@ -776,9 +781,10 @@ async fn process_web_request( insert_error_entries( batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, identity_key, + None, Some(tracking_ctx.clone()), ) .await diff --git a/src/handler/web.rs b/src/handler/web.rs index cc3862c..fa2de5c 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -217,9 +217,10 @@ pub async fn web( if let Err(e) = insert_error_entries( &state.batch_queue, ctx.project_id, - data_entry_id.unwrap_or_else(Uuid::new_v4), + data_entry_id, error, identity_key, + None, Some(tracking_ctx.clone()), ) .await diff --git a/src/main.rs b/src/main.rs index feee6c5..4f02475 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,6 +138,7 @@ async fn main() { .route("/v1/web", post(handler::web)) .route("/v1/identify", post(handler::identify)) .route("/v1/vitals", post(handler::vitals)) + .route("/v1/error", post(handler::error)) .route("/v1/replay", post(handler::replay)) .layer(axum::middleware::from_fn(track_metrics)) .layer(RequestDecompressionLayer::new()) diff --git a/src/tinybird.rs b/src/tinybird.rs index 1a2bd14..0948a36 100644 --- a/src/tinybird.rs +++ b/src/tinybird.rs @@ -79,10 +79,11 @@ pub struct ErrorTrackingRow { pub hash: String, pub error_hash: String, pub count: u32, - pub data_entry_id: Uuid, + pub data_entry_id: Option, pub session_id: Option, pub identity_key: Option, pub build_id: Option, + pub context: Option, #[serde(with = "chrono::serde::ts_milliseconds")] pub created_at: DateTime, } From ad06a8d357f18809b8f760132588baaad47ddc07 Mon Sep 17 00:00:00 2001 From: Luca Date: Tue, 24 Mar 2026 16:38:15 +0100 Subject: [PATCH 2/4] refactor: better db structure --- src/batch_queue/mod.rs | 5 +++-- src/handler/collect.rs | 6 ++++++ src/handler/error.rs | 1 + src/handler/mod.rs | 9 +++++++++ src/handler/web.rs | 1 + src/tinybird.rs | 3 ++- 6 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 25b2d09..2b93a21 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -100,7 +100,7 @@ impl QueuedEvent { QueuedEvent::WebEvent { .. } => "web_events", QueuedEvent::ModsEvent { .. } => "mods_events", QueuedEvent::Error(_) => "errors", - QueuedEvent::ErrorTracking { .. } => "error_occurences", + QueuedEvent::ErrorTracking { .. } => "error_occurences_v2", QueuedEvent::WebVital { .. } => "web_vitals", QueuedEvent::Replay { .. } => "session_replays", } @@ -1112,13 +1112,14 @@ mod tests { session_id: None, identity_key: None, build_id: None, + plugin_version: None, context: None, created_at: Utc::now(), }, tracking: None, } .datasource(), - "error_occurences" + "error_occurences_v2" ); assert_eq!( QueuedEvent::WebVital { diff --git a/src/handler/collect.rs b/src/handler/collect.rs index b4c8004..92f10c7 100644 --- a/src/handler/collect.rs +++ b/src/handler/collect.rs @@ -95,6 +95,11 @@ pub async fn collect( organization_id: ctx.organization_id.as_deref().map(Into::into), }; + let plugin_version = known + .get("plugin_version") + .and_then(|value| value.as_str()) + .map(str::to_owned); + let data_entry_id = match insert_mods_event( &state.batch_queue, ctx.project_id, @@ -130,6 +135,7 @@ pub async fn collect( Some(data_entry_id), error, identity_key, + plugin_version.clone(), None, Some(tracking_ctx.clone()), ) diff --git a/src/handler/error.rs b/src/handler/error.rs index 3518b2a..7a9615e 100644 --- a/src/handler/error.rs +++ b/src/handler/error.rs @@ -72,6 +72,7 @@ pub async fn error( None, error, identity_key, + None, context.clone(), Some(tracking_ctx.clone()), ) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index be9552a..59ef40b 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -520,6 +520,7 @@ pub async fn insert_error_entries( data_entry_id: Option, data: ErrorTracking, identity_key: Option, + plugin_version: Option, context: Option, tracking_ctx: Option, ) -> Result<(), HandlerResponse> { @@ -548,6 +549,7 @@ pub async fn insert_error_entries( session_id: data.session_id.clone(), identity_key, build_id: data.build_id.clone(), + plugin_version, context, created_at, }; @@ -628,6 +630,11 @@ async fn process_collect_request( organization_id: ctx.organization_id.as_deref().map(Into::into), }; + let plugin_version = known + .get("plugin_version") + .and_then(|value| value.as_str()) + .map(str::to_owned); + let data_entry_id = insert_mods_event( batch_queue, ctx.project_id, @@ -658,6 +665,7 @@ async fn process_collect_request( Some(data_entry_id), error, identity_key, + plugin_version.clone(), None, Some(tracking_ctx.clone()), ) @@ -785,6 +793,7 @@ async fn process_web_request( error, identity_key, None, + None, Some(tracking_ctx.clone()), ) .await diff --git a/src/handler/web.rs b/src/handler/web.rs index fa2de5c..70f3ac3 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -221,6 +221,7 @@ pub async fn web( error, identity_key, None, + None, Some(tracking_ctx.clone()), ) .await diff --git a/src/tinybird.rs b/src/tinybird.rs index 0948a36..30dc73b 100644 --- a/src/tinybird.rs +++ b/src/tinybird.rs @@ -83,6 +83,7 @@ pub struct ErrorTrackingRow { pub session_id: Option, pub identity_key: Option, pub build_id: Option, + pub plugin_version: Option, pub context: Option, #[serde(with = "chrono::serde::ts_milliseconds")] pub created_at: DateTime, @@ -244,7 +245,7 @@ impl TinybirdClient { &self, rows: &[&ErrorTrackingRow], ) -> Result<(), TinybirdError> { - self.send_batch("error_occurences", rows).await + self.send_batch("error_occurences_v2", rows).await } pub async fn insert_web_vitals(&self, rows: &[&WebVitalRow]) -> Result<(), TinybirdError> { From e435a76282a97f3a00eeae12d86ef1cba343d544 Mon Sep 17 00:00:00 2001 From: Luca Date: Tue, 24 Mar 2026 16:40:44 +0100 Subject: [PATCH 3/4] refactor: cleanup --- src/handler/collect.rs | 16 +++++++++------- src/handler/error.rs | 14 ++++++++------ src/handler/mod.rs | 40 ++++++++++++++++++++++++---------------- src/handler/web.rs | 18 ++++++++++-------- 4 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/handler/collect.rs b/src/handler/collect.rs index 92f10c7..59b8add 100644 --- a/src/handler/collect.rs +++ b/src/handler/collect.rs @@ -1,7 +1,7 @@ use super::{ - MODS_EVENT_FIELDS, check_ip_allowed, error_response, extract_known_fields, get_authorization, - get_client_ip, get_country, insert_error_entries, insert_mods_event, load_project_context, - resolve_identity_key, success_response, + ErrorEntryParams, MODS_EVENT_FIELDS, check_ip_allowed, error_response, extract_known_fields, + get_authorization, get_client_ip, get_country, insert_error_entries, insert_mods_event, + load_project_context, resolve_identity_key, success_response, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, Request}; @@ -134,10 +134,12 @@ pub async fn collect( ctx.project_id, Some(data_entry_id), error, - identity_key, - plugin_version.clone(), - None, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + plugin_version: plugin_version.clone(), + context: None, + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await { diff --git a/src/handler/error.rs b/src/handler/error.rs index 7a9615e..753536c 100644 --- a/src/handler/error.rs +++ b/src/handler/error.rs @@ -1,6 +1,6 @@ use super::{ - check_ip_allowed, error_response, get_authorization, get_client_ip, insert_error_entries, - load_project_context, resolve_identity_key, success_response, + ErrorEntryParams, check_ip_allowed, error_response, get_authorization, get_client_ip, + insert_error_entries, load_project_context, resolve_identity_key, success_response, }; use crate::batch_queue::TrackingContext; use crate::models::{AppState, ErrorTracking}; @@ -71,10 +71,12 @@ pub async fn error( ctx.project_id, None, error, - identity_key, - None, - context.clone(), - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + plugin_version: None, + context: context.clone(), + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 59ef40b..c3a7c99 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -39,6 +39,13 @@ static PROJECT_CACHE: LazyLock>> = LazyLock::n pub type HandlerResponse = (StatusCode, Json); +pub struct ErrorEntryParams { + pub identity_key: Option, + pub plugin_version: Option, + pub context: Option, + pub tracking_ctx: Option, +} + #[derive(Debug, Deserialize, Default)] pub struct EncodingQuery { pub encoding: Option, @@ -519,10 +526,7 @@ pub async fn insert_error_entries( project_id: Uuid, data_entry_id: Option, data: ErrorTracking, - identity_key: Option, - plugin_version: Option, - context: Option, - tracking_ctx: Option, + params: ErrorEntryParams, ) -> Result<(), HandlerResponse> { let mut error_rows = Vec::new(); let error_hash = build_error_rows(&data.error, &mut error_rows); @@ -547,17 +551,17 @@ pub async fn insert_error_entries( count: occurrence_count, data_entry_id, session_id: data.session_id.clone(), - identity_key, + identity_key: params.identity_key, build_id: data.build_id.clone(), - plugin_version, - context, + plugin_version: params.plugin_version, + context: params.context, created_at, }; batch_queue .queue_event(QueuedEvent::ErrorTracking { row: error_tracking, - tracking: tracking_ctx.clone(), + tracking: params.tracking_ctx, }) .await .map_err(|e| { @@ -664,10 +668,12 @@ async fn process_collect_request( ctx.project_id, Some(data_entry_id), error, - identity_key, - plugin_version.clone(), - None, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + plugin_version: plugin_version.clone(), + context: None, + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await .map_err(|_| "Failed to queue error".to_string())?; @@ -791,10 +797,12 @@ async fn process_web_request( ctx.project_id, Some(data_entry_id), error, - identity_key, - None, - None, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + plugin_version: None, + context: None, + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await .map_err(|_| "Failed to queue error".to_string())?; diff --git a/src/handler/web.rs b/src/handler/web.rs index 70f3ac3..2f58891 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -1,8 +1,8 @@ use super::{ - EncodingQuery, WEB_EVENT_FIELDS, check_ip_allowed, decompress_body, error_response, - extract_known_fields, get_authorization, get_client_ip, get_country, get_request_origin, - insert_error_entries, insert_web_event, load_project_context, resolve_identity_key, - success_response, validate_hostname, + EncodingQuery, ErrorEntryParams, WEB_EVENT_FIELDS, check_ip_allowed, decompress_body, + error_response, extract_known_fields, get_authorization, get_client_ip, get_country, + get_request_origin, insert_error_entries, insert_web_event, load_project_context, + resolve_identity_key, success_response, validate_hostname, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, ErrorTracking}; @@ -219,10 +219,12 @@ pub async fn web( ctx.project_id, data_entry_id, error, - identity_key, - None, - None, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + plugin_version: None, + context: None, + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await { From 38fb9caa924381c85d78df634baf95aacb881847 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 25 Mar 2026 11:57:57 +0100 Subject: [PATCH 4/4] update --- src/batch_queue/mod.rs | 36 +++++++---- src/handler/collect.rs | 15 +++-- src/handler/error.rs | 7 ++- src/handler/mod.rs | 131 ++++++++++++++++++++++++++++++++++++++--- src/handler/web.rs | 16 +++-- src/tinybird.rs | 17 +++++- 6 files changed, 186 insertions(+), 36 deletions(-) diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 2b93a21..d504e11 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -78,7 +78,7 @@ pub enum QueuedEvent { }, Error(ErrorRow), ErrorTracking { - row: ErrorTrackingRow, + row: Box, #[serde(skip_serializing_if = "Option::is_none")] tracking: Option, }, @@ -157,7 +157,7 @@ impl InMemoryBatch { QueuedEvent::ModsEvent { row, tracking } => self.mods_events.push((row, tracking)), QueuedEvent::Error(e) => self.errors.push(e), QueuedEvent::ErrorTracking { row, tracking } => { - self.error_trackings.push((row, tracking)) + self.error_trackings.push((*row, tracking)) } QueuedEvent::WebVital { row, tracking } => self.web_vitals.push((row, tracking)), QueuedEvent::Replay { row, tracking } => self.replays.push((row, tracking)), @@ -180,11 +180,12 @@ impl InMemoryBatch { .map(|(row, tracking)| QueuedEvent::ModsEvent { row, tracking }), ); result.extend(self.errors.into_iter().map(QueuedEvent::Error)); - result.extend( - self.error_trackings - .into_iter() - .map(|(row, tracking)| QueuedEvent::ErrorTracking { row, tracking }), - ); + result.extend(self.error_trackings.into_iter().map(|(row, tracking)| { + QueuedEvent::ErrorTracking { + row: Box::new(row), + tracking, + } + })); result.extend( self.web_vitals .into_iter() @@ -1102,7 +1103,7 @@ mod tests { ); assert_eq!( QueuedEvent::ErrorTracking { - row: ErrorTrackingRow { + row: Box::new(ErrorTrackingRow { id: Uuid::new_v4(), project_id: Uuid::new_v4(), hash: "hash".to_string(), @@ -1112,10 +1113,25 @@ mod tests { session_id: None, identity_key: None, build_id: None, - plugin_version: None, + plugin_version: String::new(), + source_kind: "error".to_string(), + entry_session_id: String::new(), + entry_country: String::new(), + entry_browser: String::new(), + entry_device: String::new(), + entry_os: String::new(), + player_count: None, + online_mode: None, + minecraft_version: String::new(), + server_type: String::new(), + java_version: String::new(), + os_version: String::new(), + os_arch: String::new(), + core_count: None, + entry_data: "{}".to_string(), context: None, created_at: Utc::now(), - }, + }), tracking: None, } .datasource(), diff --git a/src/handler/collect.rs b/src/handler/collect.rs index 59b8add..0d6a3d3 100644 --- a/src/handler/collect.rs +++ b/src/handler/collect.rs @@ -1,7 +1,8 @@ use super::{ - ErrorEntryParams, MODS_EVENT_FIELDS, check_ip_allowed, error_response, extract_known_fields, - get_authorization, get_client_ip, get_country, insert_error_entries, insert_mods_event, - load_project_context, resolve_identity_key, success_response, + ErrorEntryParams, MODS_EVENT_FIELDS, build_mods_error_entry_details, check_ip_allowed, + error_response, extract_known_fields, get_authorization, get_client_ip, get_country, + insert_error_entries, insert_mods_event, load_project_context, resolve_identity_key, + success_response, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, Request}; @@ -95,10 +96,8 @@ pub async fn collect( organization_id: ctx.organization_id.as_deref().map(Into::into), }; - let plugin_version = known - .get("plugin_version") - .and_then(|value| value.as_str()) - .map(str::to_owned); + let error_entry_details = + build_mods_error_entry_details(server_id, country.as_deref(), &known, &valid_custom); let data_entry_id = match insert_mods_event( &state.batch_queue, @@ -136,8 +135,8 @@ pub async fn collect( error, ErrorEntryParams { identity_key, - plugin_version: plugin_version.clone(), context: None, + details: error_entry_details.clone(), tracking_ctx: Some(tracking_ctx.clone()), }, ) diff --git a/src/handler/error.rs b/src/handler/error.rs index 753536c..8ee80b3 100644 --- a/src/handler/error.rs +++ b/src/handler/error.rs @@ -1,6 +1,7 @@ use super::{ - ErrorEntryParams, check_ip_allowed, error_response, get_authorization, get_client_ip, - insert_error_entries, load_project_context, resolve_identity_key, success_response, + ErrorEntryDetails, ErrorEntryParams, check_ip_allowed, error_response, get_authorization, + get_client_ip, insert_error_entries, load_project_context, resolve_identity_key, + success_response, }; use crate::batch_queue::TrackingContext; use crate::models::{AppState, ErrorTracking}; @@ -73,8 +74,8 @@ pub async fn error( error, ErrorEntryParams { identity_key, - plugin_version: None, context: context.clone(), + details: ErrorEntryDetails::error_only(), tracking_ctx: Some(tracking_ctx.clone()), }, ) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index c3a7c99..6f6fe2f 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -41,11 +41,41 @@ pub type HandlerResponse = (StatusCode, Json); pub struct ErrorEntryParams { pub identity_key: Option, - pub plugin_version: Option, pub context: Option, + pub details: ErrorEntryDetails, pub tracking_ctx: Option, } +#[derive(Clone, Default)] +pub struct ErrorEntryDetails { + pub plugin_version: String, + pub source_kind: String, + pub entry_session_id: String, + pub entry_country: String, + pub entry_browser: String, + pub entry_device: String, + pub entry_os: String, + pub player_count: Option, + pub online_mode: Option, + pub minecraft_version: String, + pub server_type: String, + pub java_version: String, + pub os_version: String, + pub os_arch: String, + pub core_count: Option, + pub entry_data: String, +} + +impl ErrorEntryDetails { + pub fn error_only() -> Self { + Self { + source_kind: "error".to_string(), + entry_data: "{}".to_string(), + ..Self::default() + } + } +} + #[derive(Debug, Deserialize, Default)] pub struct EncodingQuery { pub encoding: Option, @@ -352,6 +382,66 @@ fn to_custom_json(data: &HashMap) -> String { } } +fn read_known_string(known: &HashMap, key: &str) -> String { + known + .get(key) + .and_then(Value::as_str) + .map(str::to_owned) + .unwrap_or_default() +} + +fn read_known_f64(known: &HashMap, key: &str) -> Option { + known.get(key).and_then(Value::as_f64) +} + +fn read_known_bool(known: &HashMap, key: &str) -> Option { + known.get(key).and_then(Value::as_bool) +} + +pub fn build_web_error_entry_details( + session_id: Option<&str>, + country: Option<&str>, + known: &HashMap, + custom: &HashMap, +) -> ErrorEntryDetails { + ErrorEntryDetails { + source_kind: "web-analytics".to_string(), + entry_session_id: session_id.unwrap_or_default().to_string(), + entry_country: country.unwrap_or_default().to_string(), + entry_browser: read_known_string(known, "browser"), + entry_device: read_known_string(known, "device"), + entry_os: read_known_string(known, "os"), + os_version: read_known_string(known, "os_version"), + entry_data: to_custom_json(custom), + ..ErrorEntryDetails::default() + } +} + +pub fn build_mods_error_entry_details( + server_id: Uuid, + country: Option<&str>, + known: &HashMap, + custom: &HashMap, +) -> ErrorEntryDetails { + ErrorEntryDetails { + plugin_version: read_known_string(known, "plugin_version"), + source_kind: "minecraft-plugin".to_string(), + entry_session_id: server_id.to_string(), + entry_country: country.unwrap_or_default().to_string(), + entry_os: read_known_string(known, "os_name"), + player_count: read_known_f64(known, "player_count"), + online_mode: read_known_bool(known, "online_mode"), + minecraft_version: read_known_string(known, "minecraft_version"), + server_type: read_known_string(known, "server_type"), + java_version: read_known_string(known, "java_version"), + os_version: read_known_string(known, "os_version"), + os_arch: read_known_string(known, "os_arch"), + core_count: read_known_f64(known, "core_count"), + entry_data: to_custom_json(custom), + ..ErrorEntryDetails::default() + } +} + /// Known internal fields for web_events row. These are extracted before /// datasource validation so they always reach the Tinybird row. const WEB_EVENT_FIELDS: &[&str] = &[ @@ -553,14 +643,29 @@ pub async fn insert_error_entries( session_id: data.session_id.clone(), identity_key: params.identity_key, build_id: data.build_id.clone(), - plugin_version: params.plugin_version, + plugin_version: params.details.plugin_version, + source_kind: params.details.source_kind, + entry_session_id: params.details.entry_session_id, + entry_country: params.details.entry_country, + entry_browser: params.details.entry_browser, + entry_device: params.details.entry_device, + entry_os: params.details.entry_os, + player_count: params.details.player_count, + online_mode: params.details.online_mode, + minecraft_version: params.details.minecraft_version, + server_type: params.details.server_type, + java_version: params.details.java_version, + os_version: params.details.os_version, + os_arch: params.details.os_arch, + core_count: params.details.core_count, + entry_data: params.details.entry_data, context: params.context, created_at, }; batch_queue .queue_event(QueuedEvent::ErrorTracking { - row: error_tracking, + row: Box::new(error_tracking), tracking: params.tracking_ctx, }) .await @@ -634,10 +739,12 @@ async fn process_collect_request( organization_id: ctx.organization_id.as_deref().map(Into::into), }; - let plugin_version = known - .get("plugin_version") - .and_then(|value| value.as_str()) - .map(str::to_owned); + let error_entry_details = build_mods_error_entry_details( + server_id, + request.country.as_deref(), + &known, + &valid_custom, + ); let data_entry_id = insert_mods_event( batch_queue, @@ -670,8 +777,8 @@ async fn process_collect_request( error, ErrorEntryParams { identity_key, - plugin_version: plugin_version.clone(), context: None, + details: error_entry_details.clone(), tracking_ctx: Some(tracking_ctx.clone()), }, ) @@ -765,6 +872,12 @@ async fn process_web_request( organization_id: ctx.organization_id.as_deref().map(Into::into), }; let fallback_identity = resolved_user_id.to_string(); + let error_entry_details = build_web_error_entry_details( + parsed.session_id.as_deref(), + request.country.as_deref(), + &known, + &valid_custom, + ); let data_entry_id = insert_web_event( batch_queue, @@ -799,8 +912,8 @@ async fn process_web_request( error, ErrorEntryParams { identity_key, - plugin_version: None, context: None, + details: error_entry_details.clone(), tracking_ctx: Some(tracking_ctx.clone()), }, ) diff --git a/src/handler/web.rs b/src/handler/web.rs index 2f58891..a96f888 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -1,8 +1,8 @@ use super::{ - EncodingQuery, ErrorEntryParams, WEB_EVENT_FIELDS, check_ip_allowed, decompress_body, - error_response, extract_known_fields, get_authorization, get_client_ip, get_country, - get_request_origin, insert_error_entries, insert_web_event, load_project_context, - resolve_identity_key, success_response, validate_hostname, + EncodingQuery, ErrorEntryParams, WEB_EVENT_FIELDS, build_web_error_entry_details, + check_ip_allowed, decompress_body, error_response, extract_known_fields, get_authorization, + get_client_ip, get_country, get_request_origin, insert_error_entries, insert_web_event, + load_project_context, resolve_identity_key, success_response, validate_hostname, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, ErrorTracking}; @@ -180,6 +180,12 @@ pub async fn web( organization_id: ctx.organization_id.as_deref().map(Into::into), }; let fallback_identity = resolved_user_id.to_string(); + let error_entry_details = build_web_error_entry_details( + session_id.as_deref(), + country.as_deref(), + &known, + &valid_custom, + ); let data_entry_id = if is_debounced { None @@ -221,8 +227,8 @@ pub async fn web( error, ErrorEntryParams { identity_key, - plugin_version: None, context: None, + details: error_entry_details.clone(), tracking_ctx: Some(tracking_ctx.clone()), }, ) diff --git a/src/tinybird.rs b/src/tinybird.rs index 30dc73b..a3aa121 100644 --- a/src/tinybird.rs +++ b/src/tinybird.rs @@ -83,7 +83,22 @@ pub struct ErrorTrackingRow { pub session_id: Option, pub identity_key: Option, pub build_id: Option, - pub plugin_version: Option, + pub plugin_version: String, + pub source_kind: String, + pub entry_session_id: String, + pub entry_country: String, + pub entry_browser: String, + pub entry_device: String, + pub entry_os: String, + pub player_count: Option, + pub online_mode: Option, + pub minecraft_version: String, + pub server_type: String, + pub java_version: String, + pub os_version: String, + pub os_arch: String, + pub core_count: Option, + pub entry_data: String, pub context: Option, #[serde(with = "chrono::serde::ts_milliseconds")] pub created_at: DateTime,