diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 1a60c88..d504e11 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -78,7 +78,7 @@ pub enum QueuedEvent { }, Error(ErrorRow), ErrorTracking { - row: ErrorTrackingRow, + row: Box, #[serde(skip_serializing_if = "Option::is_none")] tracking: Option, }, @@ -100,7 +100,7 @@ impl QueuedEvent { QueuedEvent::WebEvent { .. } => "web_events", QueuedEvent::ModsEvent { .. } => "mods_events", QueuedEvent::Error(_) => "errors", - QueuedEvent::ErrorTracking { .. } => "error_occurences", + QueuedEvent::ErrorTracking { .. } => "error_occurences_v2", QueuedEvent::WebVital { .. } => "web_vitals", QueuedEvent::Replay { .. } => "session_replays", } @@ -157,7 +157,7 @@ impl InMemoryBatch { QueuedEvent::ModsEvent { row, tracking } => self.mods_events.push((row, tracking)), QueuedEvent::Error(e) => self.errors.push(e), QueuedEvent::ErrorTracking { row, tracking } => { - self.error_trackings.push((row, tracking)) + self.error_trackings.push((*row, tracking)) } QueuedEvent::WebVital { row, tracking } => self.web_vitals.push((row, tracking)), QueuedEvent::Replay { row, tracking } => self.replays.push((row, tracking)), @@ -180,11 +180,12 @@ impl InMemoryBatch { .map(|(row, tracking)| QueuedEvent::ModsEvent { row, tracking }), ); result.extend(self.errors.into_iter().map(QueuedEvent::Error)); - result.extend( - self.error_trackings - .into_iter() - .map(|(row, tracking)| QueuedEvent::ErrorTracking { row, tracking }), - ); + result.extend(self.error_trackings.into_iter().map(|(row, tracking)| { + QueuedEvent::ErrorTracking { + row: Box::new(row), + tracking, + } + })); result.extend( self.web_vitals .into_iter() @@ -1102,22 +1103,39 @@ mod tests { ); assert_eq!( QueuedEvent::ErrorTracking { - row: ErrorTrackingRow { + row: Box::new(ErrorTrackingRow { id: Uuid::new_v4(), project_id: Uuid::new_v4(), hash: "hash".to_string(), error_hash: "error-hash".to_string(), count: 3, - data_entry_id: Uuid::new_v4(), + data_entry_id: Some(Uuid::new_v4()), session_id: None, identity_key: None, build_id: None, + plugin_version: String::new(), + source_kind: "error".to_string(), + entry_session_id: String::new(), + entry_country: String::new(), + entry_browser: String::new(), + entry_device: String::new(), + entry_os: String::new(), + player_count: None, + online_mode: None, + minecraft_version: String::new(), + server_type: String::new(), + java_version: String::new(), + os_version: String::new(), + os_arch: String::new(), + core_count: None, + entry_data: "{}".to_string(), + context: None, created_at: Utc::now(), - }, + }), tracking: None, } .datasource(), - "error_occurences" + "error_occurences_v2" ); assert_eq!( QueuedEvent::WebVital { diff --git a/src/handler/collect.rs b/src/handler/collect.rs index 3cc35ff..0d6a3d3 100644 --- a/src/handler/collect.rs +++ b/src/handler/collect.rs @@ -1,7 +1,8 @@ use super::{ - MODS_EVENT_FIELDS, check_ip_allowed, error_response, extract_known_fields, get_authorization, - get_client_ip, get_country, insert_error_entries, insert_mods_event, load_project_context, - resolve_identity_key, success_response, + ErrorEntryParams, MODS_EVENT_FIELDS, build_mods_error_entry_details, check_ip_allowed, + error_response, extract_known_fields, get_authorization, get_client_ip, get_country, + insert_error_entries, insert_mods_event, load_project_context, resolve_identity_key, + success_response, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, Request}; @@ -95,6 +96,9 @@ pub async fn collect( organization_id: ctx.organization_id.as_deref().map(Into::into), }; + let error_entry_details = + build_mods_error_entry_details(server_id, country.as_deref(), &known, &valid_custom); + let data_entry_id = match insert_mods_event( &state.batch_queue, ctx.project_id, @@ -127,10 +131,14 @@ pub async fn collect( if let Err(e) = insert_error_entries( &state.batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, - identity_key, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + context: None, + details: error_entry_details.clone(), + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await { diff --git a/src/handler/error.rs b/src/handler/error.rs new file mode 100644 index 0000000..8ee80b3 --- /dev/null +++ b/src/handler/error.rs @@ -0,0 +1,89 @@ +use super::{ + ErrorEntryDetails, ErrorEntryParams, check_ip_allowed, error_response, get_authorization, + get_client_ip, insert_error_entries, load_project_context, resolve_identity_key, + success_response, +}; +use crate::batch_queue::TrackingContext; +use crate::models::{AppState, ErrorTracking}; +use axum::Json; +use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use serde::Deserialize; +use serde_json::Value; +use std::collections::HashMap; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct ErrorRequest { + errors: Vec, + #[serde(default)] + session_id: Option, + #[serde(default)] + build_id: Option, + #[serde(default)] + context: Option, +} + +pub async fn error( + State(state): State, + headers: HeaderMap, + Json(payload): Json, +) -> impl IntoResponse { + let token = match get_authorization(&headers) { + Some(t) => t, + None => return error_response(StatusCode::UNAUTHORIZED, "Unauthorized"), + }; + + let ctx = match load_project_context(&state.pool, &token).await { + Ok(ctx) => ctx, + Err(e) => return e, + }; + + let client_ip = get_client_ip(&headers); + if let Err(msg) = check_ip_allowed(&ctx.ip_rules, client_ip) { + return error_response(StatusCode::FORBIDDEN, msg); + } + + if !ctx.error_tracking_enabled { + return error_response(StatusCode::FORBIDDEN, "Error tracking is not enabled"); + } + + let tracking_ctx = TrackingContext { + owner_id: ctx.billing_customer_id.as_str().into(), + token: token.into(), + organization_id: ctx.organization_id.as_deref().map(Into::into), + }; + + let context = payload + .context + .map(|v| serde_json::to_string(&v).unwrap_or_else(|_| "{}".to_string())); + + for mut error in payload.errors { + if error.session_id.is_none() { + error.session_id = payload.session_id.clone(); + } + if error.build_id.is_none() { + error.build_id = payload.build_id.clone(); + } + let identity_key = resolve_identity_key(error.session_id.as_deref(), None); + if let Err(e) = insert_error_entries( + &state.batch_queue, + ctx.project_id, + None, + error, + ErrorEntryParams { + identity_key, + context: context.clone(), + details: ErrorEntryDetails::error_only(), + tracking_ctx: Some(tracking_ctx.clone()), + }, + ) + .await + { + return e; + } + } + + success_response(HashMap::new()) +} diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 8f2ae5d..6f6fe2f 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1,10 +1,12 @@ mod collect; +mod error; mod identify; mod replay; mod vitals; mod web; pub use collect::collect; +pub use error::error; pub use identify::identify; pub use replay::replay; pub use vitals::vitals; @@ -37,6 +39,43 @@ static PROJECT_CACHE: LazyLock>> = LazyLock::n pub type HandlerResponse = (StatusCode, Json); +pub struct ErrorEntryParams { + pub identity_key: Option, + pub context: Option, + pub details: ErrorEntryDetails, + pub tracking_ctx: Option, +} + +#[derive(Clone, Default)] +pub struct ErrorEntryDetails { + pub plugin_version: String, + pub source_kind: String, + pub entry_session_id: String, + pub entry_country: String, + pub entry_browser: String, + pub entry_device: String, + pub entry_os: String, + pub player_count: Option, + pub online_mode: Option, + pub minecraft_version: String, + pub server_type: String, + pub java_version: String, + pub os_version: String, + pub os_arch: String, + pub core_count: Option, + pub entry_data: String, +} + +impl ErrorEntryDetails { + pub fn error_only() -> Self { + Self { + source_kind: "error".to_string(), + entry_data: "{}".to_string(), + ..Self::default() + } + } +} + #[derive(Debug, Deserialize, Default)] pub struct EncodingQuery { pub encoding: Option, @@ -343,6 +382,66 @@ fn to_custom_json(data: &HashMap) -> String { } } +fn read_known_string(known: &HashMap, key: &str) -> String { + known + .get(key) + .and_then(Value::as_str) + .map(str::to_owned) + .unwrap_or_default() +} + +fn read_known_f64(known: &HashMap, key: &str) -> Option { + known.get(key).and_then(Value::as_f64) +} + +fn read_known_bool(known: &HashMap, key: &str) -> Option { + known.get(key).and_then(Value::as_bool) +} + +pub fn build_web_error_entry_details( + session_id: Option<&str>, + country: Option<&str>, + known: &HashMap, + custom: &HashMap, +) -> ErrorEntryDetails { + ErrorEntryDetails { + source_kind: "web-analytics".to_string(), + entry_session_id: session_id.unwrap_or_default().to_string(), + entry_country: country.unwrap_or_default().to_string(), + entry_browser: read_known_string(known, "browser"), + entry_device: read_known_string(known, "device"), + entry_os: read_known_string(known, "os"), + os_version: read_known_string(known, "os_version"), + entry_data: to_custom_json(custom), + ..ErrorEntryDetails::default() + } +} + +pub fn build_mods_error_entry_details( + server_id: Uuid, + country: Option<&str>, + known: &HashMap, + custom: &HashMap, +) -> ErrorEntryDetails { + ErrorEntryDetails { + plugin_version: read_known_string(known, "plugin_version"), + source_kind: "minecraft-plugin".to_string(), + entry_session_id: server_id.to_string(), + entry_country: country.unwrap_or_default().to_string(), + entry_os: read_known_string(known, "os_name"), + player_count: read_known_f64(known, "player_count"), + online_mode: read_known_bool(known, "online_mode"), + minecraft_version: read_known_string(known, "minecraft_version"), + server_type: read_known_string(known, "server_type"), + java_version: read_known_string(known, "java_version"), + os_version: read_known_string(known, "os_version"), + os_arch: read_known_string(known, "os_arch"), + core_count: read_known_f64(known, "core_count"), + entry_data: to_custom_json(custom), + ..ErrorEntryDetails::default() + } +} + /// Known internal fields for web_events row. These are extracted before /// datasource validation so they always reach the Tinybird row. const WEB_EVENT_FIELDS: &[&str] = &[ @@ -515,10 +614,9 @@ fn build_error_rows(error: &Error, errors: &mut Vec) -> String { pub async fn insert_error_entries( batch_queue: &BatchQueue, project_id: Uuid, - data_entry_id: Uuid, + data_entry_id: Option, data: ErrorTracking, - identity_key: Option, - tracking_ctx: Option, + params: ErrorEntryParams, ) -> Result<(), HandlerResponse> { let mut error_rows = Vec::new(); let error_hash = build_error_rows(&data.error, &mut error_rows); @@ -543,15 +641,32 @@ pub async fn insert_error_entries( count: occurrence_count, data_entry_id, session_id: data.session_id.clone(), - identity_key, + identity_key: params.identity_key, build_id: data.build_id.clone(), + plugin_version: params.details.plugin_version, + source_kind: params.details.source_kind, + entry_session_id: params.details.entry_session_id, + entry_country: params.details.entry_country, + entry_browser: params.details.entry_browser, + entry_device: params.details.entry_device, + entry_os: params.details.entry_os, + player_count: params.details.player_count, + online_mode: params.details.online_mode, + minecraft_version: params.details.minecraft_version, + server_type: params.details.server_type, + java_version: params.details.java_version, + os_version: params.details.os_version, + os_arch: params.details.os_arch, + core_count: params.details.core_count, + entry_data: params.details.entry_data, + context: params.context, created_at, }; batch_queue .queue_event(QueuedEvent::ErrorTracking { - row: error_tracking, - tracking: tracking_ctx.clone(), + row: Box::new(error_tracking), + tracking: params.tracking_ctx, }) .await .map_err(|e| { @@ -624,6 +739,13 @@ async fn process_collect_request( organization_id: ctx.organization_id.as_deref().map(Into::into), }; + let error_entry_details = build_mods_error_entry_details( + server_id, + request.country.as_deref(), + &known, + &valid_custom, + ); + let data_entry_id = insert_mods_event( batch_queue, ctx.project_id, @@ -651,10 +773,14 @@ async fn process_collect_request( insert_error_entries( batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, - identity_key, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + context: None, + details: error_entry_details.clone(), + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await .map_err(|_| "Failed to queue error".to_string())?; @@ -746,6 +872,12 @@ async fn process_web_request( organization_id: ctx.organization_id.as_deref().map(Into::into), }; let fallback_identity = resolved_user_id.to_string(); + let error_entry_details = build_web_error_entry_details( + parsed.session_id.as_deref(), + request.country.as_deref(), + &known, + &valid_custom, + ); let data_entry_id = insert_web_event( batch_queue, @@ -776,10 +908,14 @@ async fn process_web_request( insert_error_entries( batch_queue, ctx.project_id, - data_entry_id, + Some(data_entry_id), error, - identity_key, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + context: None, + details: error_entry_details.clone(), + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await .map_err(|_| "Failed to queue error".to_string())?; diff --git a/src/handler/web.rs b/src/handler/web.rs index cc3862c..a96f888 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -1,8 +1,8 @@ use super::{ - EncodingQuery, WEB_EVENT_FIELDS, check_ip_allowed, decompress_body, error_response, - extract_known_fields, get_authorization, get_client_ip, get_country, get_request_origin, - insert_error_entries, insert_web_event, load_project_context, resolve_identity_key, - success_response, validate_hostname, + EncodingQuery, ErrorEntryParams, WEB_EVENT_FIELDS, build_web_error_entry_details, + check_ip_allowed, decompress_body, error_response, extract_known_fields, get_authorization, + get_client_ip, get_country, get_request_origin, insert_error_entries, insert_web_event, + load_project_context, resolve_identity_key, success_response, validate_hostname, }; use crate::batch_queue::{FailedRequest, RequestType, TrackingContext}; use crate::models::{AppState, ErrorTracking}; @@ -180,6 +180,12 @@ pub async fn web( organization_id: ctx.organization_id.as_deref().map(Into::into), }; let fallback_identity = resolved_user_id.to_string(); + let error_entry_details = build_web_error_entry_details( + session_id.as_deref(), + country.as_deref(), + &known, + &valid_custom, + ); let data_entry_id = if is_debounced { None @@ -217,10 +223,14 @@ pub async fn web( if let Err(e) = insert_error_entries( &state.batch_queue, ctx.project_id, - data_entry_id.unwrap_or_else(Uuid::new_v4), + data_entry_id, error, - identity_key, - Some(tracking_ctx.clone()), + ErrorEntryParams { + identity_key, + context: None, + details: error_entry_details.clone(), + tracking_ctx: Some(tracking_ctx.clone()), + }, ) .await { diff --git a/src/main.rs b/src/main.rs index feee6c5..4f02475 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,6 +138,7 @@ async fn main() { .route("/v1/web", post(handler::web)) .route("/v1/identify", post(handler::identify)) .route("/v1/vitals", post(handler::vitals)) + .route("/v1/error", post(handler::error)) .route("/v1/replay", post(handler::replay)) .layer(axum::middleware::from_fn(track_metrics)) .layer(RequestDecompressionLayer::new()) diff --git a/src/tinybird.rs b/src/tinybird.rs index 1a2bd14..a3aa121 100644 --- a/src/tinybird.rs +++ b/src/tinybird.rs @@ -79,10 +79,27 @@ pub struct ErrorTrackingRow { pub hash: String, pub error_hash: String, pub count: u32, - pub data_entry_id: Uuid, + pub data_entry_id: Option, pub session_id: Option, pub identity_key: Option, pub build_id: Option, + pub plugin_version: String, + pub source_kind: String, + pub entry_session_id: String, + pub entry_country: String, + pub entry_browser: String, + pub entry_device: String, + pub entry_os: String, + pub player_count: Option, + pub online_mode: Option, + pub minecraft_version: String, + pub server_type: String, + pub java_version: String, + pub os_version: String, + pub os_arch: String, + pub core_count: Option, + pub entry_data: String, + pub context: Option, #[serde(with = "chrono::serde::ts_milliseconds")] pub created_at: DateTime, } @@ -243,7 +260,7 @@ impl TinybirdClient { &self, rows: &[&ErrorTrackingRow], ) -> Result<(), TinybirdError> { - self.send_batch("error_occurences", rows).await + self.send_batch("error_occurences_v2", rows).await } pub async fn insert_web_vitals(&self, rows: &[&WebVitalRow]) -> Result<(), TinybirdError> {