Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/api-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ pyo3 = { version = "0.18.0", features = ["auto-initialize"] }
env_logger = "0.11"

# Shared logic crate
shared-logic = { path = "../shared-logic" }
shared-logic = { path = "../shared-logic" }

# CSV serialization/deserialization
csv = "1.4"
86 changes: 85 additions & 1 deletion backend/api-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::{
routing::{get, post},
Json,
Router,
body::Bytes,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
Expand All @@ -17,10 +18,13 @@ use pyo3::Python;
use pyo3::types::{PyList, PyModule, PyTuple};
use pyo3::PyResult;
use pyo3::{IntoPy, ToPyObject};
use chrono::{DateTime, Utc};
use axum::http::{HeaderMap, HeaderValue, header};
use axum::response::IntoResponse;
use rand_core::OsRng;

// shared logic library
use shared_logic::db::{initialize_connection, DbClient};
use shared_logic::db::{DbClient, get_eeg_time_range, initialize_connection, export_eeg_data_as_csv};
use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState};

// Argon2 imports
Expand All @@ -36,6 +40,21 @@ struct AppState {
db_client: DbClient,
}

// define request struct for exporting EEG data
#[derive(Deserialize)]
struct ExportEEGRequest {
filename: String,
options: ExportOptions
}

#[derive(Deserialize)]
struct ExportOptions {
format: String,
includeHeader: bool,
start_time: Option<DateTime<Utc>>,
end_time: Option<DateTime<Utc>>,
}


#[derive(Debug, Clone, Deserialize)]
pub struct LoginRequest {
Expand Down Expand Up @@ -247,6 +266,68 @@ async fn get_frontend_state(
}
}

// Handler for POST /api/sessions/{session_id}/eeg_data/export
async fn export_eeg_data(
Copy link
Copy Markdown
Collaborator

@HeisSteve HeisSteve Mar 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a massive function with too many responsibility. I would recommend to break it down into bunch of helpers to improve readability and modularity

State(app_state): State<AppState>,
Path(session_id): Path<i32>,
Json(request): Json<ExportEEGRequest>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
info!("Received request to export EEG data for session {}", session_id);

// right now the only export format supported is CSV, so we just check for that
Copy link
Copy Markdown
Collaborator

@HeisSteve HeisSteve Mar 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prechecks can be in it's own helper function. Make it a more modular function, since the logic for selecting a time frame(start and end time) is not Export specific. This will reduce code duplication

`

// check for time range, else use defaults
// for end time, we default to the current time
// for start time, we default to the earliest timestamp for the session
let end_time = match request.options.end_time {
    Some(t) => t,
    None => Utc::now(),
};

let start_time = match request.options.start_time {
    Some(t) => t,
    None => {
        // we call the helper function in db.rs to get the earliest timestamp
        match shared_logic::db::get_earliest_eeg_timestamp(&app_state.db_client, session_id).await {
            Ok(Some(t)) => t,
            Ok(None) => return Err((StatusCode::NOT_FOUND, format!("No EEG data found for session {}", session_id))),
            Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get earliest EEG timestamp: {}", e))),
        }
    }
};

if start_time > end_time {
    return Err((StatusCode::BAD_REQUEST, "start_time cannot be after end_time".to_string()));
}`

if request.options.format.to_lowercase() != "csv" {
return Err((StatusCode::BAD_REQUEST, format!("Unsupported export format: {}", request.options.format)));
}

let (start_time, end_time) = get_eeg_time_range(&app_state.db_client, session_id, &request.options)
.await.map_err(|e| {
error!("Failed to get EEG time range: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get EEG time range: {}", e))
})?;

let header_included = request.options.includeHeader;

// finally call the export function in db.rs
let return_csv = match export_eeg_data_as_csv(&app_state.db_client, session_id, start_time, end_time, header_included).await {
Ok(csv_data) => csv_data,
Err(e) => {
error!("Failed to export EEG data: {}", e);
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to export EEG data: {}", e)));
}
};

// small safety: avoid quotes breaking header
let filename = request.filename.replace('"', "");

let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/csv; charset=utf-8"));

let content_disp = format!("attachment; filename=\"{}\"", filename);
headers.insert(
header::CONTENT_DISPOSITION,
HeaderValue::from_str(&content_disp).map_err(|e| {
(StatusCode::BAD_REQUEST, format!("Invalid filename for header: {}", e))
})?,
);

// return CSV directly as the body
Ok((headers, return_csv))

}

// Handler for POST /api/sessions/{session_id}/eeg_data/import
async fn import_eeg_data(
State(app_state): State<AppState>,
Path(session_id): Path<i32>,
// we expect the CSV data to be sent as raw text in the body of the request
body: Bytes,
) -> Result<Json<Value>, (StatusCode, String)> {
shared_logic::db::import_eeg_data_from_csv(&app_state.db_client, session_id, &body)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to import EEG data: {}", e)))?;

Ok(Json(json!({"status": "success"})))
}



Expand Down Expand Up @@ -353,6 +434,9 @@ async fn main() {
.route("/api/sessions/:session_id/frontend-state", post(set_frontend_state))
.route("/api/sessions/:session_id/frontend-state", get(get_frontend_state))

.route("/api/sessions/:session_id/eeg_data/export", post(export_eeg_data))
.route("/api/sessions/:session_id/eeg_data/import", post(import_eeg_data))

// Share application state with all handlers
.with_state(app_state);

Expand Down
8 changes: 8 additions & 0 deletions backend/migrations/20263101120000_sessions_on_eeg_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- note that this assumes that eeg_data has no rows (since as of this migration there should be no real data yet)
-- to do so just run TRUNCATE TABLE eeg_data before applying this migration
ALTER TABLE eeg_data
ADD COLUMN session_id INTEGER NOT NULL,
ADD CONSTRAINT fk_session FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE;

-- we can create an index on session_id and time, since the bulk of our queries will be filtering based on these
CREATE INDEX eeg_data_session_time_idx ON eeg_data (session_id, time DESC); -- using DESC since i'm expecting recent data to be more relevant
5 changes: 4 additions & 1 deletion backend/shared-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ rand_core = "0.6"

# working with python
pyo3 = { version = "0.18.0", features = ["auto-initialize"] }
numpy = "0.18"
numpy = "0.18"

# CSV serialization/deserialization
csv = "1.4"
10 changes: 7 additions & 3 deletions backend/shared-logic/src/bc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub async fn start_broadcast(
write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
cancel_token: CancellationToken,
processing_config: ProcessingConfig, // takes in signal processing configuration from frontend
session_id: i32, // takes in session id to tag incoming data with the correct session
windowing_rx: watch::Receiver<WindowingConfig> // takes in windowing configuration from frontend
) {
let (tx, _rx) = broadcast::channel::<Arc<EEGDataPacket>>(1000); // size of the broadcast buffer, not recommand below 500, websocket will miss messages
Expand Down Expand Up @@ -55,7 +56,7 @@ pub async fn start_broadcast(

// Subscribe for database Receiver
tokio::spawn(async move {
db_receiver( rx_db).await;
db_receiver( rx_db, session_id).await;
});

//waits for sender to complete.
Expand Down Expand Up @@ -111,7 +112,10 @@ pub async fn ws_receiver(write: &Arc<Mutex<SplitSink<WebSocketStream<TcpStream>,

//db_broadcast_receiver takes EEGDataPacket struct from the broadcast sender and inserts it into the database
// it inserts as a batch of 100.
pub async fn db_receiver(mut rx_db: Receiver<Arc<EEGDataPacket>>){
pub async fn db_receiver(
mut rx_db: Receiver<Arc<EEGDataPacket>>,
session_id: i32,
){
let db_client = get_db_client();

let mut packet_count = 0; // for debug purposes
Expand All @@ -131,7 +135,7 @@ pub async fn db_receiver(mut rx_db: Receiver<Arc<EEGDataPacket>>){
// Insert the packet directly
tokio::spawn(async move {
let now = Instant::now(); // for debug purposes
if let Err(e) = insert_batch_eeg(&db_client_clone, &eeg_packet).await {
if let Err(e) = insert_batch_eeg(&db_client_clone, session_id, &eeg_packet).await {
error!("Packet insert failed: {:?}", e);
}
info!("Packet insert took {:?}", now.elapsed()); // for debug purposes
Expand Down
Loading
Loading