diff --git a/CODEOWNERS b/CODEOWNERS index ce0eebb..650d9d6 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1 +1 @@ -* @makemake-kbo @0xgregthedev +* @makemake-kbo @0xgregthedev @fredo diff --git a/crates/server/src/api/accept.rs b/crates/server/src/api/accept.rs index cb75796..4f268ef 100644 --- a/crates/server/src/api/accept.rs +++ b/crates/server/src/api/accept.rs @@ -35,13 +35,14 @@ pub async fn accept_request( db: DbRequestSender, signer: &PrivateKeySigner, docker: Arc, + client_addr: std::net::SocketAddr, ) -> Result>, Infallible> where B: hyper::body::Body, { tracing::debug!(target = "api::accept_request", "Incoming request"); // Respond accordingly - let resp = match match_method(tx, &db, signer, docker).await { + let resp = match match_method(tx, &db, signer, docker, client_addr).await { Ok(rax) => rax, Err(e) => { let e = e.to_string(); @@ -58,18 +59,32 @@ macro_rules! accept { $io:expr, $db:expr, $signer:expr, - $docker:expr + $docker:expr, + $client_addr:expr ) => { let db_c = $db.clone(); + let signer_clone = $signer.clone(); + let docker_clone = $docker.clone(); + let client_addr = $client_addr; // Bind the incoming connection to our service if let Err(err) = hyper::server::conn::http1::Builder::new() // `service_fn` converts our function in a `Service` .serve_connection( $io, - hyper::service::service_fn(|req| { - let response = - $crate::api::accept::accept_request(req, db_c.clone(), $signer, $docker); - response + hyper::service::service_fn(move |req| { + let db_c = db_c.clone(); + let signer_clone = signer_clone.clone(); + let docker_clone = docker_clone.clone(); + async move { + $crate::api::accept::accept_request( + req, + db_c, + &signer_clone, + docker_clone, + client_addr, + ) + .await + } }), ) .with_upgrades() diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 273132e..cf93489 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -176,7 +176,7 @@ fn serve_connection( // Spawn a tokio task to serve multiple connections concurrently tokio::task::spawn(async move { - crate::accept!(io, db_clone, &signer, docker_clone.clone()); + crate::accept!(io, db_clone, &signer, docker_clone.clone(), socketaddr); }); } diff --git a/crates/server/src/api/process_request.rs b/crates/server/src/api/process_request.rs index 06d7410..69749fc 100644 --- a/crates/server/src/api/process_request.rs +++ b/crates/server/src/api/process_request.rs @@ -1,4 +1,5 @@ use std::{ + net::SocketAddr, sync::Arc, time::Instant, }; @@ -51,6 +52,7 @@ use serde_json::{ Value, }; use tokio::sync::oneshot; +use uuid::Uuid; use http_body_util::BodyExt; use hyper::{ @@ -60,24 +62,41 @@ use hyper::{ use tracing::{ debug, info, + trace, + warn, }; /// Matches the incoming method sent by a client to a corresponding function. -#[tracing::instrument(level = "debug", skip_all, target = "api::match_method")] +#[tracing::instrument( + level = "debug", + skip_all, + target = "api::match_method", + fields(request_id, client_addr) +)] pub async fn match_method( req: Request, db: &DbRequestSender, signer: &PrivateKeySigner, docker: Arc, + client_addr: SocketAddr, ) -> Result where B: hyper::body::Body, { + // Generate unique request ID for correlation + let request_id = Uuid::new_v4(); + let client_ip = client_addr.ip().to_string(); + + // Add request context to the current tracing span + tracing::Span::current().record("request_id", tracing::field::display(&request_id)); + tracing::Span::current().record("client_addr", tracing::field::display(&client_addr)); + // Read body and parse as JSON-RPC request let headers = req.headers().clone(); let body = req.into_body().collect().await?.to_bytes(); let json_rpc: Value = serde_json::from_slice(&body)?; let method = json_rpc["method"].as_str().unwrap_or_default(); + let json_rpc_id = json_rpc["id"].clone(); let labels = [("http_method", method.to_string())]; gauge!("api_requests_active", &labels).increment(1); @@ -86,6 +105,9 @@ where info!( target: "json_rpc", %method, + %request_id, + %client_ip, + json_rpc_id = %json_rpc_id, ?headers, "Received request" ); @@ -96,13 +118,29 @@ where "da_submit_assertion" => { let code = match json_rpc["params"][0].as_str() { Some(code) => code, - _ => return Ok(rpc_error(&json_rpc, -32602, "Invalid params")), + _ => { + warn!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, "Invalid params: missing or invalid bytecode parameter"); + return Ok(rpc_error_with_request_id( + &json_rpc, + -32602, + "Invalid params", + &request_id, + )); + } }; // Validate hex inputs let bytecode = match alloy::hex::decode(code.trim_start_matches("0x")) { Ok(code) => code, - _ => return Ok(rpc_error(&json_rpc, 500, "Failed to decode hex")), + _ => { + warn!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, code = code, "Failed to decode hex bytecode"); + return Ok(rpc_error_with_request_id( + &json_rpc, + 500, + "Failed to decode hex", + &request_id, + )); + } }; debug!(target: "json_rpc", bytecode_len = bytecode.len(), "Submitting raw assertion bytecode"); @@ -111,16 +149,19 @@ where let id = keccak256(&bytecode); let signature = match signer.sign_hash(&id).await { Ok(sig) => sig, - Err(_) => { - return Ok(rpc_error( + Err(err) => { + warn!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, "Failed to sign assertion"); + return Ok(rpc_error_with_request_id( &json_rpc, -32604, "Internal Error: Failed to sign Assertion", - )) + &request_id, + )); } }; - debug!(target: "json_rpc", ?id, ?signature, bytecode_hex = hex::encode(&bytecode), "Compiled solidity source"); + trace!(target: "json_rpc", ?id, ?signature, bytecode_hex = hex::encode(&bytecode), "Raw submitted bytecode"); + debug!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Processed raw assertion submission, proceeding to database storage"); let stored_assertion = StoredAssertion::new( "NaN".to_string(), @@ -132,7 +173,26 @@ where Bytes::new(), ); - let res = process_add_assertion(id, stored_assertion, db, &json_rpc).await; + let res = process_add_assertion( + id, + stored_assertion, + db, + &json_rpc, + request_id, + &client_ip, + &json_rpc_id, + ) + .await; + + // Log success or failure based on response + if let Ok(ref response) = res { + if !response.contains("\"error\"") { + info!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Successfully processed raw assertion submission"); + } else { + warn!(target: "json_rpc", method = "da_submit_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Failed to process raw assertion submission"); + } + } + histogram!( "da_request_duration_seconds", "method" => "submit_solidity_assertion", @@ -142,17 +202,20 @@ where res } "da_submit_solidity_assertion" => { - let da_submission: DaSubmission = - match serde_json::from_value(json_rpc["params"][0].clone()) { - Ok(da_submission) => da_submission, - Err(err) => { - return Ok(rpc_error( - &json_rpc, - -32602, - format!("Invalid params: Failed to parse payload {err:?}").as_str(), - )) - } - }; + let da_submission: DaSubmission = match serde_json::from_value( + json_rpc["params"][0].clone(), + ) { + Ok(da_submission) => da_submission, + Err(err) => { + warn!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, "Failed to parse DaSubmission payload"); + return Ok(rpc_error_with_request_id( + &json_rpc, + -32602, + format!("Invalid params: Failed to parse payload {err:?}").as_str(), + &request_id, + )); + } + }; debug!(target: "json_rpc", compiler_version = da_submission.compiler_version, da_submission.solidity_source , "Compiling solidity source"); @@ -166,11 +229,13 @@ where { Ok(bytecode) => bytecode, Err(err) => { - return Ok(rpc_error( + warn!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, compiler_version = da_submission.compiler_version, contract_name = da_submission.assertion_contract_name, "Solidity compilation failed"); + return Ok(rpc_error_with_request_id( &json_rpc, -32603, &format!("Solidity Compilation Error: {err}"), - )) + &request_id, + )); } }; @@ -180,11 +245,13 @@ where ) { Ok(encoded_args) => encoded_args, Err(err) => { - return Ok(rpc_error( + warn!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, constructor_abi = da_submission.constructor_abi_signature, "Constructor args ABI encoding failed"); + return Ok(rpc_error_with_request_id( &json_rpc, -32603, &format!("Constructor args ABI Encoding Error: {err}"), - )) + &request_id, + )); } }; @@ -195,20 +262,21 @@ where let id = keccak256(&deployment_data); let prover_signature = match signer.sign_hash(&id).await { Ok(sig) => sig, - Err(_) => { - return Ok(rpc_error( + Err(err) => { + warn!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, "Failed to sign assertion"); + return Ok(rpc_error_with_request_id( &json_rpc, -32604, "Internal Error: Failed to sign Assertion", - )) + &request_id, + )); } }; - - debug!(target: "json_rpc", ?id, ?prover_signature, bytecode_hex = ?deployment_data, "Compiled solidity source"); + trace!(target: "json_rpc", ?id, ?prover_signature, bytecode_hex = ?deployment_data, "Compiled solidity source"); let stored_assertion = StoredAssertion::new( - da_submission.assertion_contract_name, - da_submission.compiler_version, + da_submission.assertion_contract_name.clone(), + da_submission.compiler_version.clone(), da_submission.solidity_source, deployment_data, prover_signature, @@ -216,7 +284,25 @@ where encoded_constructor_args, ); - let res = process_add_assertion(id, stored_assertion, db, &json_rpc).await; + let res = process_add_assertion( + id, + stored_assertion, + db, + &json_rpc, + request_id, + &client_ip, + &json_rpc_id, + ) + .await; + + if let Ok(ref response) = res { + if !response.contains("\"error\"") { + info!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, contract_name = da_submission.assertion_contract_name, compiler_version = da_submission.compiler_version, "Successfully compiled Solidity assertion"); + } else { + warn!(target: "json_rpc", method = "da_submit_solidity_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, contract_name = da_submission.assertion_contract_name, compiler_version = da_submission.compiler_version, "Failed to process Solidity assertion"); + } + } + histogram!( "da_request_duration_seconds", "method" => "submit_solidity_assertion", @@ -229,11 +315,13 @@ where let id = match json_rpc["params"][0].as_str() { Some(id) => id, None => { - return Ok(rpc_error( + warn!(target: "json_rpc", method = "da_get_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, "Invalid params: missing id parameter"); + return Ok(rpc_error_with_request_id( &json_rpc, -32602, "Invalid params: Didn't find id", - )) + &request_id, + )); } }; @@ -241,17 +329,28 @@ where let id: B256 = match id.trim_start_matches("0x").parse() { Ok(id) => id, _ => { - return Ok(rpc_error( + warn!(target: "json_rpc", method = "da_get_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, id = id, "Failed to decode hex ID"); + return Ok(rpc_error_with_request_id( &json_rpc, -32605, "Internal Error: Failed to decode hex of id", - )) + &request_id, + )); } }; debug!(target: "json_rpc", ?id, "Getting assertion"); - let res = process_get_assertion(id, db, &json_rpc).await; + let res = + process_get_assertion(id, db, &json_rpc, request_id, &client_ip, &json_rpc_id) + .await; + + // Log success for get_assertion if not an error response + if let Ok(ref response) = res { + if !response.contains("\"error\"") { + info!(target: "json_rpc", method = "da_get_assertion", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Successfully retrieved assertion"); + } + } histogram!( "da_request_duration_seconds", "method" => "get_assertion", @@ -261,6 +360,7 @@ where res } _ => { + warn!(target: "json_rpc", method = method, %request_id, %client_ip, json_rpc_id = %json_rpc_id, "Unknown JSON-RPC method"); gauge!("api_requests_active", &labels).decrement(1); counter!("api_requests_error_count", &labels).increment(1); histogram!( @@ -268,10 +368,29 @@ where "method" => "unknown" ) .record(req_start.elapsed().as_secs_f64()); - Ok(rpc_error(&json_rpc, -32601, "Method not found")) + Ok(rpc_error_with_request_id( + &json_rpc, + -32601, + "Method not found", + &request_id, + )) } }; + // Log final request completion status + match &result { + Ok(response) => { + if response.contains("\"error\"") { + debug!(target: "json_rpc", %method, %request_id, %client_ip, json_rpc_id = %json_rpc_id, duration_ms = req_start.elapsed().as_millis(), "Request completed with error"); + } else { + info!(target: "json_rpc", %method, %request_id, %client_ip, json_rpc_id = %json_rpc_id, duration_ms = req_start.elapsed().as_millis(), "Request completed successfully"); + } + } + Err(err) => { + warn!(target: "json_rpc", %method, %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, duration_ms = req_start.elapsed().as_millis(), "Request failed with internal error"); + } + } + result } @@ -321,14 +440,23 @@ async fn process_add_assertion( stored_assertion: StoredAssertion, db: &DbRequestSender, json_rpc: &Value, + request_id: Uuid, + client_ip: &str, + json_rpc_id: &Value, ) -> Result { // Store in database let (tx, rx) = oneshot::channel(); let ser_assertion = match bincode::serialize(&stored_assertion) { Ok(ser) => ser, - Err(_) => { - return Ok(rpc_error(json_rpc, -32603, "Internal error d")); + Err(err) => { + warn!(target: "json_rpc", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, "Failed to serialize assertion for database storage"); + return Ok(rpc_error_with_request_id( + json_rpc, + -32603, + "Internal error d", + &request_id, + )); } }; @@ -345,12 +473,30 @@ async fn process_add_assertion( }; match rx.await { - Ok(_) => Ok(rpc_response(json_rpc, result)), - Err(_) => Ok(rpc_error(json_rpc, -32603, "Internal error c")), + Ok(_) => { + debug!(target: "json_rpc", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Successfully stored assertion in database"); + Ok(rpc_response(json_rpc, result)) + } + Err(err) => { + debug!(target: "json_rpc", %request_id, %client_ip, json_rpc_id = %json_rpc_id, error = %err, "Database operation failed for assertion storage"); + Ok(rpc_error_with_request_id( + json_rpc, + -32603, + "Internal error c", + &request_id, + )) + } } } -async fn process_get_assertion(id: B256, db: &DbRequestSender, json_rpc: &Value) -> Result { +async fn process_get_assertion( + id: B256, + db: &DbRequestSender, + json_rpc: &Value, + request_id: Uuid, + client_ip: &str, + json_rpc_id: &Value, +) -> Result { let (tx, rx) = oneshot::channel(); let req = DbRequest { request: DbOperation::Get(id.to_vec()), @@ -374,7 +520,15 @@ async fn process_get_assertion(id: B256, db: &DbRequestSender, json_rpc: &Value) Ok(rpc_response(json_rpc, result)) } - None => Ok(rpc_error(json_rpc, 404, "Assertion not found")), + None => { + warn!(target: "json_rpc", %request_id, %client_ip, json_rpc_id = %json_rpc_id, ?id, "Assertion not found in database"); + Ok(rpc_error_with_request_id( + json_rpc, + 404, + "Assertion not found", + &request_id, + )) + } } } @@ -387,6 +541,7 @@ fn rpc_response(request: &Value, result: T) -> String { .to_string() } +#[allow(dead_code)] fn rpc_error(request: &Value, code: i128, message: &str) -> String { json!({ "jsonrpc": "2.0", @@ -399,6 +554,26 @@ fn rpc_error(request: &Value, code: i128, message: &str) -> String { .to_string() } +fn rpc_error_with_request_id( + request: &Value, + code: i128, + message: &str, + request_id: &Uuid, +) -> String { + json!({ + "jsonrpc": "2.0", + "error": { + "code": code, + "message": message, + "data": { + "request_id": request_id.to_string() + } + }, + "id": request["id"] + }) + .to_string() +} + #[cfg(test)] mod tests { use super::*; diff --git a/examples/test_http_client.rs b/examples/test_http_client.rs index 5455b52..1709768 100644 --- a/examples/test_http_client.rs +++ b/examples/test_http_client.rs @@ -1,3 +1,6 @@ +use std::str::FromStr; + +use alloy::primitives::FixedBytes; use assertion_da_client::DaClient; #[tokio::main] @@ -9,9 +12,15 @@ async fn main() -> Result<(), Box> { // Test with HTTPS endpoint println!("Testing HTTPS endpoint..."); - let _https_client = DaClient::new("https://demo-21-assertion-da.phylax.systems")?; + let https_client = DaClient::new("https://demo-21-assertion-da.phylax.systems")?; println!("✓ HTTPS client created successfully"); + let bytes: FixedBytes<32> = + FixedBytes::from_str("43ccaf21bc5cf9efce72530ecfecbd6d513e895546749720048e0e39bbedce37") // example id + .expect("REASON"); + let rax = https_client.fetch_assertion(bytes).await.unwrap(); + println!("Fetched assertion: {rax:?}"); + // Test with authentication println!("Testing authenticated client..."); let _auth_client = DaClient::new_with_auth(