diff --git a/Cargo.lock b/Cargo.lock index 96f863a..cb28ef7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -895,7 +895,6 @@ dependencies = [ "serde", "serde_json", "tempfile", - "thiserror", "time", "ulid", ] @@ -1008,6 +1007,7 @@ dependencies = [ "edda-bridge-claude", "edda-core", "edda-derive", + "edda-ingestion", "edda-ledger", "edda-store", "globset", diff --git a/crates/edda-serve/Cargo.toml b/crates/edda-serve/Cargo.toml index 9880875..1a74197 100644 --- a/crates/edda-serve/Cargo.toml +++ b/crates/edda-serve/Cargo.toml @@ -17,6 +17,7 @@ edda-ledger = { path = "../edda-ledger", version = "0.1.1" } edda-aggregate = { path = "../edda-aggregate", version = "0.1.1" } edda-store = { path = "../edda-store", version = "0.1.1" } edda-bridge-claude = { path = "../edda-bridge-claude", version = "0.1.1" } +edda-ingestion = { path = "../edda-ingestion", version = "0.1.1" } axum = "0.8" tracing = { workspace = true } tokio = { version = "1", features = ["rt-multi-thread", "net", "time"] } diff --git a/crates/edda-serve/src/lib.rs b/crates/edda-serve/src/lib.rs index 5ff0fb2..5f175b7 100644 --- a/crates/edda-serve/src/lib.rs +++ b/crates/edda-serve/src/lib.rs @@ -206,6 +206,20 @@ pub async fn serve(repo_root: &Path, config: ServeConfig) -> anyhow::Result<()> .route("/api/snapshots/{context_hash}", get(get_snapshots_by_hash)) .route("/api/villages/{village_id}/stats", get(get_village_stats)) .route("/api/patterns", get(get_patterns)) + .route("/api/ingestion/evaluate", post(post_ingestion_evaluate)) + .route( + "/api/ingestion/records", + post(post_ingestion_record).get(get_ingestion_records), + ) + .route("/api/ingestion/suggestions", get(get_ingestion_suggestions)) + .route( + "/api/ingestion/suggestions/{id}/accept", + post(post_suggestion_accept), + ) + .route( + "/api/ingestion/suggestions/{id}/reject", + post(post_suggestion_reject), + ) .route("/api/pair/new", post(create_pairing)) .route("/api/pair/list", get(list_paired_devices)) .route("/api/pair/revoke", post(revoke_device)) @@ -319,6 +333,20 @@ fn router(repo_root: &Path) -> Router { .route("/api/snapshots/{context_hash}", get(get_snapshots_by_hash)) .route("/api/villages/{village_id}/stats", get(get_village_stats)) .route("/api/patterns", get(get_patterns)) + .route("/api/ingestion/evaluate", post(post_ingestion_evaluate)) + .route( + "/api/ingestion/records", + post(post_ingestion_record).get(get_ingestion_records), + ) + .route("/api/ingestion/suggestions", get(get_ingestion_suggestions)) + .route( + "/api/ingestion/suggestions/{id}/accept", + post(post_suggestion_accept), + ) + .route( + "/api/ingestion/suggestions/{id}/reject", + post(post_suggestion_reject), + ) .route("/pair", get(complete_pairing)) .route("/api/pair/new", post(create_pairing)) .route("/api/pair/list", get(list_paired_devices)) @@ -4076,6 +4104,288 @@ async fn get_event_stream( )) } +// ── Ingestion types ── + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct EvaluateBody { + event_type: String, + source_layer: String, + #[serde(default)] + source_refs: Vec, + #[serde(default)] + summary: Option, + #[serde(default)] + detail: Option, + #[serde(default)] + tags: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct EvaluateResponse { + action: String, + #[serde(skip_serializing_if = "Option::is_none")] + record_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + suggestion_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ManualIngestBody { + event_type: String, + source_layer: String, + #[serde(default)] + source_refs: Vec, + summary: String, + detail: serde_json::Value, + #[serde(default)] + tags: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct IngestionRecordsQuery { + #[serde(default)] + limit: Option, + #[serde(default)] + source_layer: Option, + #[serde(default)] + trigger_type: Option, +} + +// ── Ingestion handlers ── + +fn time_now_rfc3339() -> String { + time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .expect("RFC3339 formatting should not fail") +} + +// POST /api/ingestion/evaluate +async fn post_ingestion_evaluate( + State(state): State>, + body: Result, JsonRejection>, +) -> Result { + let Json(body) = body.map_err(|e| AppError::Validation(e.body_text()))?; + + let layer: edda_ingestion::SourceLayer = body + .source_layer + .parse() + .map_err(|e: String| AppError::Validation(e))?; + + let result = edda_ingestion::evaluate_trigger(&body.event_type, &body.source_layer); + + match result { + edda_ingestion::TriggerResult::AutoIngest => { + let ledger = state.open_ledger()?; + let _lock = WorkspaceLock::acquire(&ledger.paths)?; + + let summary = body + .summary + .unwrap_or_else(|| format!("{} from {}", body.event_type, body.source_layer)); + let record = edda_ingestion::IngestionRecord { + id: edda_ingestion::IngestionRecord::new_id("prec"), + trigger_type: edda_ingestion::TriggerType::Auto, + event_type: body.event_type, + source_layer: layer, + source_refs: body.source_refs, + summary, + detail: body.detail.unwrap_or(serde_json::json!({})), + tags: body.tags, + created_at: time_now_rfc3339(), + }; + + edda_ingestion::write_ingestion_record(&ledger, &record)?; + + Ok(( + StatusCode::CREATED, + Json(EvaluateResponse { + action: "ingested".to_string(), + record_id: Some(record.id), + suggestion_id: None, + reason: None, + }), + ) + .into_response()) + } + edda_ingestion::TriggerResult::SuggestIngest { reason } => { + let ledger = state.open_ledger()?; + let _lock = WorkspaceLock::acquire(&ledger.paths)?; + + let summary = body + .summary + .unwrap_or_else(|| format!("{} from {}", body.event_type, body.source_layer)); + let suggestion = edda_ingestion::Suggestion { + id: edda_ingestion::Suggestion::new_id(), + event_type: body.event_type, + source_layer: layer, + source_refs: body.source_refs, + summary, + suggested_because: reason.clone(), + detail: body.detail.unwrap_or(serde_json::json!({})), + tags: body.tags, + status: edda_ingestion::SuggestionStatus::Pending, + created_at: time_now_rfc3339(), + reviewed_at: None, + }; + + let queue = edda_ingestion::SuggestionQueue::new(&ledger); + let id = queue.enqueue(&suggestion)?; + + Ok(( + StatusCode::OK, + Json(EvaluateResponse { + action: "queued".to_string(), + record_id: None, + suggestion_id: Some(id), + reason: Some(reason), + }), + ) + .into_response()) + } + edda_ingestion::TriggerResult::Skip => Ok(( + StatusCode::OK, + Json(EvaluateResponse { + action: "skipped".to_string(), + record_id: None, + suggestion_id: None, + reason: None, + }), + ) + .into_response()), + } +} + +// POST /api/ingestion/records +async fn post_ingestion_record( + State(state): State>, + body: Result, JsonRejection>, +) -> Result { + let Json(body) = body.map_err(|e| AppError::Validation(e.body_text()))?; + + let layer: edda_ingestion::SourceLayer = body + .source_layer + .parse() + .map_err(|e: String| AppError::Validation(e))?; + + let ledger = state.open_ledger()?; + let _lock = WorkspaceLock::acquire(&ledger.paths)?; + + let record = edda_ingestion::IngestionRecord { + id: edda_ingestion::IngestionRecord::new_id("prec"), + trigger_type: edda_ingestion::TriggerType::Manual, + event_type: body.event_type, + source_layer: layer, + source_refs: body.source_refs, + summary: body.summary, + detail: body.detail, + tags: body.tags, + created_at: time_now_rfc3339(), + }; + + edda_ingestion::write_ingestion_record(&ledger, &record)?; + + Ok(( + StatusCode::CREATED, + Json(serde_json::json!({ "recordId": record.id })), + )) +} + +// GET /api/ingestion/records +async fn get_ingestion_records( + State(state): State>, + Query(params): Query, +) -> Result>, AppError> { + let ledger = state.open_ledger()?; + let events = ledger.iter_events_by_type("ingestion")?; + + let mut records: Vec = events + .into_iter() + .filter_map(|e| serde_json::from_value(e.payload).ok()) + .collect(); + + if let Some(ref layer) = params.source_layer { + records.retain(|r| r.source_layer.to_string() == *layer); + } + if let Some(ref tt) = params.trigger_type { + records.retain(|r| { + let label = match r.trigger_type { + edda_ingestion::TriggerType::Auto => "auto", + edda_ingestion::TriggerType::Suggested => "suggested", + edda_ingestion::TriggerType::Manual => "manual", + }; + label == tt.as_str() + }); + } + + let limit = params.limit.unwrap_or(50); + records.truncate(limit); + + Ok(Json(records)) +} + +// GET /api/ingestion/suggestions +async fn get_ingestion_suggestions( + State(state): State>, +) -> Result>, AppError> { + let ledger = state.open_ledger()?; + let queue = edda_ingestion::SuggestionQueue::new(&ledger); + let pending = queue.list_pending()?; + Ok(Json(pending)) +} + +// POST /api/ingestion/suggestions/{id}/accept +async fn post_suggestion_accept( + State(state): State>, + AxumPath(id): AxumPath, +) -> Result, AppError> { + let ledger = state.open_ledger()?; + let _lock = WorkspaceLock::acquire(&ledger.paths)?; + + // Pre-check for proper HTTP error codes + let row = ledger + .get_suggestion(&id)? + .ok_or_else(|| AppError::NotFound(format!("suggestion not found: {id}")))?; + if row.status != "pending" { + return Err(AppError::Conflict(format!( + "suggestion {id} has status '{}', expected 'pending'", + row.status + ))); + } + + let queue = edda_ingestion::SuggestionQueue::new(&ledger); + let record = queue.accept(&id)?; + Ok(Json(record)) +} + +// POST /api/ingestion/suggestions/{id}/reject +async fn post_suggestion_reject( + State(state): State>, + AxumPath(id): AxumPath, +) -> Result, AppError> { + let ledger = state.open_ledger()?; + let _lock = WorkspaceLock::acquire(&ledger.paths)?; + + // Pre-check for proper HTTP error codes + let row = ledger + .get_suggestion(&id)? + .ok_or_else(|| AppError::NotFound(format!("suggestion not found: {id}")))?; + if row.status != "pending" { + return Err(AppError::Conflict(format!( + "suggestion {id} has status '{}', expected 'pending'", + row.status + ))); + } + + let queue = edda_ingestion::SuggestionQueue::new(&ledger); + queue.reject(&id)?; + Ok(Json(serde_json::json!({ "ok": true }))) +} + // ── Tests ── #[cfg(test)] @@ -7980,4 +8290,485 @@ actors: assert_eq!(resp.status(), StatusCode::BAD_REQUEST); } + + // ── Ingestion integration tests ── + + #[tokio::test] + async fn ingestion_auto_ingest_via_evaluate() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // POST /api/ingestion/evaluate with auto-ingest trigger + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "decision.commit", + "sourceLayer": "L1", + "summary": "Formal decision committed", + "detail": {"session": "ds_test"} + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::CREATED); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["action"], "ingested"); + assert!(json["recordId"].as_str().unwrap().starts_with("prec_")); + + // GET /api/ingestion/records — verify it was written + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .uri("/api/ingestion/records") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp2.status(), StatusCode::OK); + let body2 = axum::body::to_bytes(resp2.into_body(), usize::MAX) + .await + .unwrap(); + let records: Vec = serde_json::from_slice(&body2).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0]["eventType"], "decision.commit"); + assert_eq!(records[0]["triggerType"], "auto"); + } + + #[tokio::test] + async fn ingestion_suggest_then_accept() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // POST /api/ingestion/evaluate with suggest trigger + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "route.changed", + "sourceLayer": "L1", + "summary": "Route changed in session", + "detail": {"path": "/api/foo"} + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["action"], "queued"); + let sug_id = json["suggestionId"].as_str().unwrap().to_string(); + assert!(sug_id.starts_with("sug_")); + assert!(json["reason"].as_str().is_some()); + + // GET /api/ingestion/suggestions — 1 pending + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .uri("/api/ingestion/suggestions") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp2.status(), StatusCode::OK); + let body2 = axum::body::to_bytes(resp2.into_body(), usize::MAX) + .await + .unwrap(); + let suggestions: Vec = serde_json::from_slice(&body2).unwrap(); + assert_eq!(suggestions.len(), 1); + assert_eq!(suggestions[0]["id"], sug_id); + + // POST /api/ingestion/suggestions/{id}/accept + let app3 = router(tmp.path()); + let resp3 = app3 + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/ingestion/suggestions/{sug_id}/accept")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp3.status(), StatusCode::OK); + let body3 = axum::body::to_bytes(resp3.into_body(), usize::MAX) + .await + .unwrap(); + let record: serde_json::Value = serde_json::from_slice(&body3).unwrap(); + assert!(record["id"].as_str().unwrap().starts_with("prec_")); + assert_eq!(record["triggerType"], "suggested"); + + // GET /api/ingestion/suggestions — empty after accept + let app4 = router(tmp.path()); + let resp4 = app4 + .oneshot( + Request::builder() + .uri("/api/ingestion/suggestions") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body4 = axum::body::to_bytes(resp4.into_body(), usize::MAX) + .await + .unwrap(); + let suggestions4: Vec = serde_json::from_slice(&body4).unwrap(); + assert!(suggestions4.is_empty()); + + // GET /api/ingestion/records — 1 record with trigger_type=suggested + let app5 = router(tmp.path()); + let resp5 = app5 + .oneshot( + Request::builder() + .uri("/api/ingestion/records") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body5 = axum::body::to_bytes(resp5.into_body(), usize::MAX) + .await + .unwrap(); + let records: Vec = serde_json::from_slice(&body5).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0]["triggerType"], "suggested"); + } + + #[tokio::test] + async fn ingestion_never_ingest_skips() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // POST /api/ingestion/evaluate with never-ingest trigger + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "followup.draft", + "sourceLayer": "L1" + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["action"], "skipped"); + + // GET /api/ingestion/records — empty + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .uri("/api/ingestion/records") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body2 = axum::body::to_bytes(resp2.into_body(), usize::MAX) + .await + .unwrap(); + let records: Vec = serde_json::from_slice(&body2).unwrap(); + assert!(records.is_empty()); + } + + #[tokio::test] + async fn ingestion_manual_record() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // POST /api/ingestion/records (manual) + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/records") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "custom.event", + "sourceLayer": "L1", + "summary": "Manual ingestion test", + "detail": {"key": "value"} + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::CREATED); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert!(json["recordId"].as_str().unwrap().starts_with("prec_")); + + // GET /api/ingestion/records — 1 record with trigger_type=manual + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .uri("/api/ingestion/records") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body2 = axum::body::to_bytes(resp2.into_body(), usize::MAX) + .await + .unwrap(); + let records: Vec = serde_json::from_slice(&body2).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0]["triggerType"], "manual"); + } + + #[tokio::test] + async fn ingestion_accept_nonexistent_404() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/suggestions/sug_fake/accept") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn ingestion_reject_nonexistent_404() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/suggestions/sug_fake/reject") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn ingestion_suggest_then_reject() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // Queue a suggestion + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "route.changed", + "sourceLayer": "L1", + "summary": "Route change to reject", + "detail": {} + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let sug_id = json["suggestionId"].as_str().unwrap().to_string(); + + // Reject it + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/ingestion/suggestions/{sug_id}/reject")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp2.status(), StatusCode::OK); + let body2 = axum::body::to_bytes(resp2.into_body(), usize::MAX) + .await + .unwrap(); + let json2: serde_json::Value = serde_json::from_slice(&body2).unwrap(); + assert_eq!(json2["ok"], true); + + // GET /api/ingestion/records — empty (rejected, not written) + let app3 = router(tmp.path()); + let resp3 = app3 + .oneshot( + Request::builder() + .uri("/api/ingestion/records") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body3 = axum::body::to_bytes(resp3.into_body(), usize::MAX) + .await + .unwrap(); + let records: Vec = serde_json::from_slice(&body3).unwrap(); + assert!(records.is_empty()); + } + + #[tokio::test] + async fn ingestion_evaluate_invalid_layer_400() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "decision.commit", + "sourceLayer": "L99" + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn ingestion_accept_already_accepted_409() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // Queue a suggestion + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/ingestion/evaluate") + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "eventType": "route.changed", + "sourceLayer": "L1", + "summary": "Route change for double-accept test", + "detail": {} + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let sug_id = json["suggestionId"].as_str().unwrap().to_string(); + + // Accept it (first time — should succeed) + let app2 = router(tmp.path()); + let resp2 = app2 + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/ingestion/suggestions/{sug_id}/accept")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp2.status(), StatusCode::OK); + + // Accept again — should get 409 Conflict + let app3 = router(tmp.path()); + let resp3 = app3 + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/ingestion/suggestions/{sug_id}/accept")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp3.status(), StatusCode::CONFLICT); + } }