Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions src/batch_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum QueuedEvent {
},
Error(ErrorRow),
ErrorTracking {
row: ErrorTrackingRow,
row: Box<ErrorTrackingRow>,
#[serde(skip_serializing_if = "Option::is_none")]
tracking: Option<TrackingContext>,
},
Expand All @@ -100,7 +100,7 @@ impl QueuedEvent {
QueuedEvent::WebEvent { .. } => "web_events",
QueuedEvent::ModsEvent { .. } => "mods_events",
QueuedEvent::Error(_) => "errors",
QueuedEvent::ErrorTracking { .. } => "error_occurences",
QueuedEvent::ErrorTracking { .. } => "error_occurences_v2",
QueuedEvent::WebVital { .. } => "web_vitals",
QueuedEvent::Replay { .. } => "session_replays",
}
Expand Down Expand Up @@ -157,7 +157,7 @@ impl InMemoryBatch {
QueuedEvent::ModsEvent { row, tracking } => self.mods_events.push((row, tracking)),
QueuedEvent::Error(e) => self.errors.push(e),
QueuedEvent::ErrorTracking { row, tracking } => {
self.error_trackings.push((row, tracking))
self.error_trackings.push((*row, tracking))
}
QueuedEvent::WebVital { row, tracking } => self.web_vitals.push((row, tracking)),
QueuedEvent::Replay { row, tracking } => self.replays.push((row, tracking)),
Expand All @@ -180,11 +180,12 @@ impl InMemoryBatch {
.map(|(row, tracking)| QueuedEvent::ModsEvent { row, tracking }),
);
result.extend(self.errors.into_iter().map(QueuedEvent::Error));
result.extend(
self.error_trackings
.into_iter()
.map(|(row, tracking)| QueuedEvent::ErrorTracking { row, tracking }),
);
result.extend(self.error_trackings.into_iter().map(|(row, tracking)| {
QueuedEvent::ErrorTracking {
row: Box::new(row),
tracking,
}
}));
result.extend(
self.web_vitals
.into_iter()
Expand Down Expand Up @@ -1102,22 +1103,39 @@ mod tests {
);
assert_eq!(
QueuedEvent::ErrorTracking {
row: ErrorTrackingRow {
row: Box::new(ErrorTrackingRow {
id: Uuid::new_v4(),
project_id: Uuid::new_v4(),
hash: "hash".to_string(),
error_hash: "error-hash".to_string(),
count: 3,
data_entry_id: Uuid::new_v4(),
data_entry_id: Some(Uuid::new_v4()),
session_id: None,
identity_key: None,
build_id: None,
plugin_version: String::new(),
source_kind: "error".to_string(),
entry_session_id: String::new(),
entry_country: String::new(),
entry_browser: String::new(),
entry_device: String::new(),
entry_os: String::new(),
player_count: None,
online_mode: None,
minecraft_version: String::new(),
server_type: String::new(),
java_version: String::new(),
os_version: String::new(),
os_arch: String::new(),
core_count: None,
entry_data: "{}".to_string(),
context: None,
created_at: Utc::now(),
},
}),
tracking: None,
}
.datasource(),
"error_occurences"
"error_occurences_v2"
);
assert_eq!(
QueuedEvent::WebVital {
Expand Down
20 changes: 14 additions & 6 deletions src/handler/collect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{
MODS_EVENT_FIELDS, check_ip_allowed, error_response, extract_known_fields, get_authorization,
get_client_ip, get_country, insert_error_entries, insert_mods_event, load_project_context,
resolve_identity_key, success_response,
ErrorEntryParams, MODS_EVENT_FIELDS, build_mods_error_entry_details, check_ip_allowed,
error_response, extract_known_fields, get_authorization, get_client_ip, get_country,
insert_error_entries, insert_mods_event, load_project_context, resolve_identity_key,
success_response,
};
use crate::batch_queue::{FailedRequest, RequestType, TrackingContext};
use crate::models::{AppState, Request};
Expand Down Expand Up @@ -95,6 +96,9 @@ pub async fn collect(
organization_id: ctx.organization_id.as_deref().map(Into::into),
};

let error_entry_details =
build_mods_error_entry_details(server_id, country.as_deref(), &known, &valid_custom);

let data_entry_id = match insert_mods_event(
&state.batch_queue,
ctx.project_id,
Expand Down Expand Up @@ -127,10 +131,14 @@ pub async fn collect(
if let Err(e) = insert_error_entries(
&state.batch_queue,
ctx.project_id,
data_entry_id,
Some(data_entry_id),
error,
identity_key,
Some(tracking_ctx.clone()),
ErrorEntryParams {
identity_key,
context: None,
details: error_entry_details.clone(),
tracking_ctx: Some(tracking_ctx.clone()),
},
)
.await
{
Expand Down
89 changes: 89 additions & 0 deletions src/handler/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use super::{
ErrorEntryDetails, ErrorEntryParams, check_ip_allowed, error_response, get_authorization,
get_client_ip, insert_error_entries, load_project_context, resolve_identity_key,
success_response,
};
use crate::batch_queue::TrackingContext;
use crate::models::{AppState, ErrorTracking};
use axum::Json;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ErrorRequest {
errors: Vec<ErrorTracking>,
#[serde(default)]
session_id: Option<String>,
#[serde(default)]
build_id: Option<String>,
#[serde(default)]
context: Option<Value>,
}

pub async fn error(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<ErrorRequest>,
) -> impl IntoResponse {
let token = match get_authorization(&headers) {
Some(t) => t,
None => return error_response(StatusCode::UNAUTHORIZED, "Unauthorized"),
};

let ctx = match load_project_context(&state.pool, &token).await {
Ok(ctx) => ctx,
Err(e) => return e,
};

let client_ip = get_client_ip(&headers);
if let Err(msg) = check_ip_allowed(&ctx.ip_rules, client_ip) {
return error_response(StatusCode::FORBIDDEN, msg);
}

if !ctx.error_tracking_enabled {
return error_response(StatusCode::FORBIDDEN, "Error tracking is not enabled");
}

let tracking_ctx = TrackingContext {
owner_id: ctx.billing_customer_id.as_str().into(),
token: token.into(),
organization_id: ctx.organization_id.as_deref().map(Into::into),
};

let context = payload
.context
.map(|v| serde_json::to_string(&v).unwrap_or_else(|_| "{}".to_string()));

for mut error in payload.errors {
if error.session_id.is_none() {
error.session_id = payload.session_id.clone();
}
if error.build_id.is_none() {
error.build_id = payload.build_id.clone();
}
let identity_key = resolve_identity_key(error.session_id.as_deref(), None);
if let Err(e) = insert_error_entries(
&state.batch_queue,
ctx.project_id,
None,
error,
ErrorEntryParams {
identity_key,
context: context.clone(),
details: ErrorEntryDetails::error_only(),
tracking_ctx: Some(tracking_ctx.clone()),
},
)
.await
{
return e;
}
}

success_response(HashMap::new())
}
Loading