diff --git a/requirements.txt b/requirements.txt
index 366ee7e..fbd5b30 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,3 +10,4 @@ streamlit>=1.20
pandas>=1.4
numpy>=1.23
typing-extensions>=4.0
+pybids >= 0.22.0
\ No newline at end of file
diff --git a/ui/fmriprep_test.sh b/ui/fmriprep_test.sh
index c12a56d..f8a660b 100755
--- a/ui/fmriprep_test.sh
+++ b/ui/fmriprep_test.sh
@@ -1,10 +1,10 @@
pipeline_script="ui.py"
qc_pipeline="fmriprep"
-qc_task="anat_wf_qc"
-qc_json="sample_qc.json"
+qc_task="anat_wf_qc func_wf_qc"
+qc_json="sample_qc_multi.json"
participant_list="qc_participants.tsv"
output_dir="../output"
-port_number="8501"
+port_number="8503"
streamlit run $pipeline_script --server.port=$port_number -- \
--qc_json $qc_json \
diff --git a/ui/layout.py b/ui/layout.py
index fa27414..6691c7e 100644
--- a/ui/layout.py
+++ b/ui/layout.py
@@ -3,190 +3,302 @@
from datetime import datetime
import streamlit as st
from niivue_component import niivue_viewer
-from utils import parse_qc_config, load_mri_data, load_svg_data, save_qc_results_to_csv
+from utils import parse_all_qc_tasks, parse_qc_config, parse_bids_entities, group_svg_paths_by_run, load_mri_data, load_mesh_data, load_svg_data, save_qc_results_to_csv
from models import MetricQC, QCRecord
+@st.dialog("SVG Viewer", width="large")
+def show_svg_dialog(name: str, content: str) -> None:
+ st.markdown(f"**{name}**")
+ st.markdown(f'
{content}
', unsafe_allow_html=True)
-def app(participant_id, session_id, qc_pipeline, qc_task, qc_config_path, out_dir) -> None:
- """Main Streamlit layout: top inputs, middle two viewers, bottom QC controls."""
- st.set_page_config(layout="wide")
- # Top container: inputs
- top = st.container()
- with top:
- st.title("Welcome to Nipoppy QC-Studio! 🚀")
- # qc_pipeline = "fMRIPrep"
- # qc_task = "sdc-wf"
- st.subheader(f"QC Pipeline: {qc_pipeline}, QC task: {qc_task}")
+def _render_svg_items(svg_items: list[tuple[str, str]], key_prefix: str) -> None:
+ """Render a list of (name, content) SVG tuples with expand buttons."""
+ if not svg_items:
+ st.info("SVG montage not found or could not be loaded.")
+ return
+ if len(svg_items) == 1:
+ name, content = svg_items[0]
+ if st.button("🔍 View full size", key=f"{key_prefix}_expand_0"):
+ show_svg_dialog(name, content)
+ st.markdown(
+ f''
+ f''
+ f'{content}
',
+ unsafe_allow_html=True,
+ )
+ else:
+ tabs = st.tabs([name for name, _ in svg_items])
+ for i, (tab, (name, content)) in enumerate(zip(tabs, svg_items)):
+ with tab:
+ if st.button("🔍 View full size", key=f"{key_prefix}_expand_{i}"):
+ show_svg_dialog(name, content)
+ st.markdown(
+ f''
+ f''
+ f'{content}
',
+ unsafe_allow_html=True,
+ )
- # show participant and session
- st.write(f"Participant ID: {participant_id} | Session ID: {session_id}")
- # Rater info
- rater_id = st.text_input("Rater name or ID: 🧑" )
- st.write("You entered:", rater_id)
-
- # Remove spaces
- rater_id = "".join(rater_id.split())
-
- # Split into two columns for collecting rater specific info
- exp_col, fatigue_col = st.columns([0.5, 0.5], gap="small")
-
- with exp_col:
- # Input rater experience as radio buttons
- options = ["Beginner (< 1 year experience)", "Intermediate (1-5 year experience)", "Expert (>5 year experience)"]
- # add radio buttons
- # experience_level = st.radio()
- rater_experience = st.radio("What is your QC experience level:", options)
- st.write("Experience level:", rater_experience)
-
- with fatigue_col:
- # Input rater experience as radio buttons
- options = ["Not at all", "A bit tired ☕", "Very tired ☕☕"]
- # add radio buttons
- # experience_level = st.radio()
- rater_fatigue = st.radio("How tired are you feeling:", options)
- st.write("Fatigue level:", rater_fatigue)
-
+def _render_task_panels(
+ task_key: str,
+ qc_config: dict,
+ participant_id: str,
+ session_id: str,
+ qc_pipeline: str,
+ out_dir: str,
+ rater_id: str,
+ rater_experience: str,
+ rater_fatigue: str,
+) -> None:
+ """Render the middle (NiiVue + SVG) and bottom (IQM + QC rating) panels
+ for a single QC task. All Streamlit widget keys are scoped with
+ ``task_key`` so multiple tasks can coexist on the same page.
+ """
+ PANEL_HEIGHT = 600
- # parse qc config
- qc_config = parse_qc_config(qc_config_path, qc_task)
- # print(f"qc config: {qc_config_path}, {qc_config}")
+ # Middle: side-by-side viewers
+ has_niivue = bool(qc_config.get("base_mri_image_path"))
+ has_svg = bool(qc_config.get("svg_montage_path"))
+ has_iqm_plot = bool(qc_config.get("iqm_path"))
+ has_mesh = bool(qc_config.get("mesh_paths"))
- # Middle: two side-by-side viewers
- middle = st.container()
- with middle:
- niivue_col, svg_col = st.columns([0.4, 0.6], gap="small")
+ if has_niivue and has_svg:
+ middle_left, middle_right = st.columns([0.4, 0.6], gap="small")
+ elif has_niivue:
+ middle_left = st.container()
+ middle_right = None
+ elif has_svg:
+ middle_left = None
+ middle_right = st.container()
+ else:
+ middle_left = middle_right = None
- with niivue_col:
- # Create a narrow controls column and a main viewer area inside the niivue column
+ if has_niivue and middle_left:
+ with middle_left:
+ st.subheader("MRI (3D) viewer: NiiVue")
cfg_col, view_col = st.columns([0.32, 0.68], gap="small")
with cfg_col:
st.header("Niivue Controls")
- # Persistent controls column (sidebar-like)
view_mode = st.selectbox(
"View Mode",
["multiplanar", "axial", "coronal", "sagittal", "3d"],
- help="Select the viewing perspective"
+ key=f"{task_key}_view_mode",
+ help="Select the viewing perspective",
)
-
- height = 600 #st.slider("Viewer Height (px)", 400, 1000, 600, 50)
+ height = 600
overlay_colormap = st.selectbox(
"Overlay Colormap",
- ["grey", "cool", "warm"],
- help="Select the colormap for the overlay"
+ ["grey", "cool", "warm", "red", "yellow"],
+ key=f"{task_key}_overlay_colormap",
+ help="Select the colormap for the overlay",
)
-
st.divider()
st.subheader("Display Settings")
- show_crosshair = st.checkbox("Show Crosshair", value=False)
- radiological = st.checkbox("Radiological Convention", value=False)
- show_colorbar = st.checkbox("Show Colorbar", value=True)
- interpolation = st.checkbox("Interpolation", value=True)
-
- # Toggle to show/hide overlay image in the Niivue column
- show_overlay = st.checkbox("Show overlay image", value=False)
+ show_crosshair = st.checkbox("Show Crosshair", value=False, key=f"{task_key}_crosshair")
+ radiological = st.checkbox("Radiological Convention", value=False, key=f"{task_key}_radiological")
+ show_colorbar = st.checkbox("Show Colorbar", value=True, key=f"{task_key}_colorbar")
+ interpolation = st.checkbox("Interpolation", value=True, key=f"{task_key}_interpolation")
+ show_overlay = st.checkbox("Show overlay image", value=False, key=f"{task_key}_show_overlay")
+ show_mesh = st.checkbox("Show FreeSurfer surfaces", value=has_mesh, disabled=not has_mesh, key=f"{task_key}_show_mesh")
with view_col:
st.header("3D MRI (Niivue)")
- # Show mri
mri_data = load_mri_data(qc_config)
if "base_mri_image_bytes" in mri_data:
base_mri_image_bytes = mri_data["base_mri_image_bytes"]
base_mri_name = str(qc_config.get("base_mri_image_path").name) if qc_config.get("base_mri_image_path") else "base_mri.nii"
-
try:
- # Prepare settings dictionary
settings = {
- "crosshair": show_crosshair,
- "radiological": radiological,
- "colorbar": show_colorbar,
- "interpolation": interpolation
+ "crosshair": show_crosshair,
+ "radiological": radiological,
+ "colorbar": show_colorbar,
+ "interpolation": interpolation,
}
-
- # Prepare optional overlays only if user enabled and overlay bytes exist
overlays = []
if show_overlay and "overlay_mri_image_bytes" in mri_data:
- overlays.append(
- {
- "data": mri_data["overlay_mri_image_bytes"],
- "name": "overlay",
- "colormap": overlay_colormap,
- "opacity": 0.5,
- }
- )
-
- # Build kwargs for niivue_viewer; include overlays only when present
+ overlays.append({
+ "data": mri_data["overlay_mri_image_bytes"],
+ "name": "overlay",
+ "colormap": overlay_colormap,
+ "opacity": 0.5,
+ })
+ meshes = []
+ if show_mesh and has_mesh:
+ meshes = load_mesh_data(qc_config)
+
overlay_state = f"{overlay_colormap}_{show_overlay}"
- viewer_key = f"niivue_{view_mode}_{overlay_state}"
+ mesh_state = f"mesh_{show_mesh}"
+ viewer_key = f"niivue_{task_key}_{view_mode}_{overlay_state}_{mesh_state}"
viewer_kwargs = {
"nifti_data": base_mri_image_bytes,
- "filename": base_mri_name,
- "height": height,
- "key": viewer_key,
- "view_mode": view_mode,
- "settings": settings,
+ "filename": base_mri_name,
+ "height": height,
+ "key": viewer_key,
+ "view_mode": view_mode,
+ "settings": settings,
+ "styled": True,
}
if overlays:
viewer_kwargs["overlays"] = overlays
-
- viewer_kwargs["styled"] = True
+ if meshes:
+ viewer_kwargs["meshes"] = meshes
niivue_viewer(**viewer_kwargs)
-
except Exception as e:
st.error(f"Failed to load base MRI in Niivue viewer: {e}")
else:
st.info("Base MRI image not found or could not be loaded.")
- with svg_col:
- st.header("SVG Montage")
- # Show SVG montage
- svg_data = load_svg_data(qc_config)
- if svg_data:
- st.components.v1.html(svg_data, height=600, scrolling=True)
- else:
+ # Group SVG paths by (session, run) — used by both the SVG panel and QC rating
+ svg_groups = group_svg_paths_by_run(qc_config) # {(ses, run): [Path,...]}
+ multi_group = len(svg_groups) > 1
+
+ def _group_label(ses: str | None, task: str | None, run: str | None) -> str:
+ parts = []
+ if ses: parts.append(f"ses-{ses}")
+ if task: parts.append(f"task-{task}")
+ if run: parts.append(f"run-{run}")
+ return " ".join(parts) if parts else "N/A"
+
+ if has_svg and middle_right:
+ with middle_right:
+ st.subheader("Report (SVG) Montage")
+ if not svg_groups:
st.info("SVG montage not found or could not be loaded.")
+ elif not multi_group:
+ paths = next(iter(svg_groups.values()))
+ _render_svg_items([(p.name, p.read_text()) for p in paths], task_key)
+ else:
+ group_tabs = st.tabs([_group_label(s, t, r) for s, t, r in svg_groups])
+ for tab, ((ses_val, task_val, run_val), paths) in zip(group_tabs, svg_groups.items()):
+ with tab:
+ key_prefix = f"{task_key}_ses{ses_val}_task{task_val}_run{run_val}"
+ _render_svg_items([(p.name, p.read_text()) for p in paths], key_prefix)
- # Bottom: QC metrics and radio buttons
- bottom = st.container()
- with bottom:
- # st.header("QC: Rating & Metrics")
- rating_col, iqm_col = st.columns([0.4, 0.6], gap="small")
- with iqm_col:
- st.subheader("QC Metrics")
- # Placeholder: user may compute or display metrics here
- st.write("Add QC metrics here (e.g., SNR, motion). This is a placeholder area.")
-
- with rating_col:
- st.subheader("QC Rating")
- rating = st.radio("Rate this qc-task:", options=("PASS", "FAIL", "UNCERTAIN"), index=0)
- notes = st.text_area("Notes (optional):")
- if st.button("💾 Save QC results to CSV", width=600):
- now = datetime.now()
- timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
- out_file = Path(out_dir) / f"{rater_id}_QC_status.tsv"
-
- record = QCRecord(
- participant_id=participant_id,
- session_id=session_id,
- qc_task=qc_task,
- pipeline=qc_pipeline,
- timestamp=timestamp,
- rater_id=rater_id,
- rater_experience=rater_experience,
- rater_fatigue=rater_fatigue,
- final_qc=rating,
- notes=notes,
- )
+ # Bottom: IQM plot + QC rating
+ st.divider()
+ if has_iqm_plot:
+ bot_left, bot_right = st.columns([0.4, 0.6], gap="small")
+ else:
+ bot_left = st.container()
+ bot_right = None
+
+ if has_iqm_plot and bot_right:
+ with bot_right:
+ st.subheader("IQM distributions")
+ iqm_paths = qc_config.get("iqm_path") or []
+ svg_iqms = [
+ (p.name, p.read_text())
+ for p in iqm_paths
+ if p and p.is_file() and p.suffix == ".svg"
+ ]
+ if svg_iqms:
+ _render_svg_items(svg_iqms, f"{task_key}_iqm")
+ else:
+ st.info("IQM file not found or unsupported format.")
+
+ with bot_left:
+ st.subheader("QC Rating")
+ bids = parse_bids_entities(qc_config)
+ task_id = bids["task_id"]
+
+ # One rating + save per (session, run) group
+ _ses_fallback = session_id.removeprefix("ses-") if session_id else None
+ group_keys = list(svg_groups.keys()) if multi_group else [(_ses_fallback, task_id, bids["run_id"])]
+ rating_tabs = st.tabs([_group_label(s, t, r) for s, t, r in group_keys]) if multi_group else [None]
+
+ for rating_tab, (ses_val, task_val, run_val) in zip(rating_tabs, group_keys):
+ ctx = rating_tab if multi_group else st.container()
+ scope = f"{task_key}_ses{ses_val}_task{task_val}_run{run_val}" if multi_group else task_key
+ with ctx:
+ rating = st.radio("Rate this qc-task:", options=("PASS", "FAIL", "UNCERTAIN"), index=None, key=f"{scope}_rating")
+ notes = st.text_area("Notes (optional):", key=f"{scope}_notes")
+ if st.button("💾 Save QC results to CSV", key=f"{scope}_save", width=600):
+ now = datetime.now()
+ timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
+ out_file = Path(out_dir) / f"{rater_id}_QC_status.tsv"
+ record = QCRecord(
+ participant_id=participant_id,
+ session_id=(f"ses-{ses_val}" if ses_val else session_id),
+ task_id=task_val,
+ run_id=run_val,
+ qc_task=task_key,
+ pipeline=qc_pipeline,
+ timestamp=timestamp,
+ rater_id=rater_id,
+ rater_experience=rater_experience,
+ rater_fatigue=rater_fatigue,
+ final_qc=rating,
+ notes=notes,
+ )
+ out_path = save_qc_results_to_csv(out_file, [record])
+ st.success(f"QC results saved to: {out_path}")
+
+
+def app(participant_id, session_id, qc_pipeline, qc_task, qc_config_path, out_dir) -> None:
+ """Main Streamlit layout: top inputs, middle two viewers, bottom QC controls.
+
+ ``qc_task`` may be a single task name string, a list of task name strings,
+ or ``None`` / ``"all"`` to render every task found in the config file.
+ """
+ st.set_page_config(layout="wide")
+
+ # Top: participant info + rater inputs (shared across all tasks)
+ top = st.container()
+ with top:
+ st.title("Welcome to Nipoppy QC-Studio! 🚀")
+ st.subheader(f"QC Pipeline: {qc_pipeline}")
+ st.write(f"Participant ID: {participant_id} | Session ID: {session_id}")
+
+ rater_id = st.text_input("Rater name or ID: 🧑")
+ st.write("You entered:", rater_id)
+ rater_id = "".join(rater_id.split())
+
+ exp_col, fatigue_col = st.columns([0.5, 0.5], gap="small")
+ with exp_col:
+ options = ["Beginner (< 1 year experience)", "Intermediate (1-5 year experience)", "Expert (>5 year experience)"]
+ rater_experience = st.radio("What is your QC experience level:", options)
+ st.write("Experience level:", rater_experience)
+ with fatigue_col:
+ options = ["Not at all", "A bit tired ☕", "Very tired ☕☕"]
+ rater_fatigue = st.radio("How tired are you feeling:", options)
+ st.write("Fatigue level:", rater_fatigue)
+
+ # Resolve which tasks to render
+ all_tasks = parse_all_qc_tasks(qc_config_path)
+
+ if qc_task is None or qc_task == "all":
+ tasks_to_render = all_tasks
+ elif isinstance(qc_task, list):
+ tasks_to_render = {k: all_tasks[k] for k in qc_task if k in all_tasks}
+ else:
+ tasks_to_render = {qc_task: all_tasks[qc_task]} if qc_task in all_tasks else {}
+
+ if not tasks_to_render:
+ st.error(f"No matching QC tasks found in {qc_config_path}.")
+ return
+
+ # Render panels — one tab per task when there are multiple
+ rater_kwargs = dict(
+ participant_id=participant_id,
+ session_id=session_id,
+ qc_pipeline=qc_pipeline,
+ out_dir=out_dir,
+ rater_id=rater_id,
+ rater_experience=rater_experience,
+ rater_fatigue=rater_fatigue,
+ )
- # TODO: handle list of records (i.e. multiple subjects and/or qc-tasks)
- # For now just save a single record
-
- record_list = [record]
- out_path = save_qc_results_to_csv(out_file, record_list)
- st.success(f"QC results saved to: {out_path}")
-
-
+ if len(tasks_to_render) == 1:
+ task_key, qc_config = next(iter(tasks_to_render.items()))
+ st.subheader(f"QC task: {task_key}")
+ _render_task_panels(task_key, qc_config, **rater_kwargs)
+ else:
+ tabs = st.tabs(list(tasks_to_render.keys()))
+ for tab, (task_key, qc_config) in zip(tabs, tasks_to_render.items()):
+ with tab:
+ _render_task_panels(task_key, qc_config, **rater_kwargs)
diff --git a/ui/models.py b/ui/models.py
index f22e1d7..d8c2e20 100644
--- a/ui/models.py
+++ b/ui/models.py
@@ -79,6 +79,10 @@ class QCTask(BaseModel):
Optional[List[Path]], Field(description="Path(s) to IQM TSV/JSON or other QC files")
] = None
+ mesh_paths: Annotated[
+ Optional[List[Path]], Field(description="Paths to FreeSurfer surface meshes (e.g. lh.pial, rh.pial, lh.white, rh.white)")
+ ] = None
+
class QCConfig(RootModel[Dict[str, QCTask]]):
"""Top-level model for `qc.json`.
diff --git a/ui/sample_qc_multi.json b/ui/sample_qc_multi.json
new file mode 100644
index 0000000..620214d
--- /dev/null
+++ b/ui/sample_qc_multi.json
@@ -0,0 +1,25 @@
+{
+ "anat_wf_qc": {
+ "base_mri_image_path": "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sourcedata/freesurfer/sub-CMH0066/mri/T1.mgz",
+ "svg_montage_path": [
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_desc-reconall_T1w.svg"
+ ],
+ "mesh_paths": [
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sourcedata/freesurfer/sub-CMH0066/surf/lh.white",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sourcedata/freesurfer/sub-CMH0066/surf/rh.white",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sourcedata/freesurfer/sub-CMH0066/surf/lh.pial",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sourcedata/freesurfer/sub-CMH0066/surf/rh.pial"
+ ]
+ },
+ "func_wf_qc": {
+ "base_mri_image_path": "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/ses-01/func/sub-CMH0053_ses-01_task-rest_run-02_space-MNI152NLin2009cAsym_boldref.nii.gz",
+ "overlay_mri_image_path": "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/anat/sub-CMH0053_space-MNI152NLin2009cAsym_desc-brain_mask.nii.gz",
+ "svg_montage_path": [
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_ses-01_task-rest_run-02_desc-sdc_bold.svg",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_ses-01_task-rest_run-02_desc-coreg_bold.svg",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_ses-02_task-rest_run-02_desc-sdc_bold.svg",
+ "/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_ses-02_task-rest_run-02_desc-coreg_bold.svg"
+ ],
+ "iqm_path": ["/projects/ttan/ASCEND/data/derivatives/fmriprep/23.2.3/sub-CMH0053/figures/sub-CMH0053_ses-01_task-rest_run-02_desc-confoundcorr_bold.svg"]
+ }
+}
\ No newline at end of file
diff --git a/ui/ui.py b/ui/ui.py
index 6f340b2..841efff 100644
--- a/ui/ui.py
+++ b/ui/ui.py
@@ -34,9 +34,12 @@ def parse_args(args=None):
)
parser.add_argument(
"--qc_task",
- help=("Specific workflow output to QC"),
+ help=("One or more workflow keys to QC (e.g. anat_wf_qc func_wf_qc). "
+ "Omit to render all tasks found in the config file."),
dest="qc_task",
- required=True,
+ nargs="*",
+ default=None,
+ required=False,
)
parser.add_argument(
"--output_dir",
@@ -59,7 +62,14 @@ def parse_args(args=None):
participant_list = args.participant_list
session_list = args.session_list
qc_pipeline = args.qc_pipeline
-qc_task = args.qc_task
+# None → render all; single item list → unwrap to string; multiple → keep list
+_qc_task_arg = args.qc_task
+if _qc_task_arg is None or len(_qc_task_arg) == 0:
+ qc_task = None
+elif len(_qc_task_arg) == 1:
+ qc_task = _qc_task_arg[0]
+else:
+ qc_task = _qc_task_arg
qc_json = args.qc_json
out_dir = args.out_dir
@@ -86,7 +96,7 @@ def init_session_state():
qc_config_path = os.path.join(current_dir, qc_json)
# print(f"qc path: {qc_config_path}")
-participant_id = "sub-ED01"
+participant_id = "sub-CMH0053"
session_id = "ses-01"
app(
diff --git a/ui/utils.py b/ui/utils.py
index d416962..941dc34 100644
--- a/ui/utils.py
+++ b/ui/utils.py
@@ -2,174 +2,272 @@
from pathlib import Path
import pandas as pd
from models import QCConfig, QCRecord
-
+from bids.layout import parse_file_entities
+
+def group_svg_paths_by_run(qc_config: dict) -> dict[tuple[str | None, str | None, str | None], list[Path]]:
+ """Group svg_montage_path entries by their BIDS (session, task, run) entities.
+
+ Returns an ordered dict ``{(ses_id, task_id, run_id): [Path, ...]}``.
+ Each value is ``None`` when that entity is absent from the filename.
+ """
+ svg_paths = qc_config.get("svg_montage_path") or []
+ if not isinstance(svg_paths, list):
+ svg_paths = [svg_paths]
+
+ groups: dict[tuple[str | None, str | None, str | None], list[Path]] = {}
+ for p in svg_paths:
+ if p and p.is_file():
+ entities = parse_file_entities(str(p))
+ raw_ses = entities.get("session")
+ raw_task = entities.get("task")
+ raw_run = entities.get("run")
+ ses_key = str(raw_ses) if raw_ses is not None else None
+ task_key_bids = str(raw_task) if raw_task is not None else None
+ run_key = str(raw_run) if raw_run is not None else None
+ groups.setdefault((ses_key, task_key_bids, run_key), []).append(p)
+ return groups
+
+
+def parse_bids_entities(qc_config: dict) -> dict:
+ """Extract BIDS task and run labels from all paths in qc_config.
+
+ Scans every available path (svg_montage_path, base_mri_image_path, iqm_path)
+ and returns the first non-None value found for each entity.
+ Returns a dict with keys ``task_id`` and ``run_id`` (either str or None).
+ """
+ svg_paths = qc_config.get("svg_montage_path") or []
+ if not isinstance(svg_paths, list):
+ svg_paths = [svg_paths]
+
+ candidates = [
+ *svg_paths,
+ qc_config.get("base_mri_image_path"),
+ qc_config.get("iqm_path"),
+ ]
+
+ task_id = run_id = None
+ for path in candidates:
+ if path is None:
+ continue
+ entities = parse_file_entities(str(path))
+ if task_id is None:
+ task_id = entities.get("task")
+ if run_id is None:
+ raw_run = entities.get("run")
+ run_id = str(raw_run) if raw_run is not None else None
+ if task_id and run_id:
+ break
+
+ return {"task_id": task_id, "run_id": run_id}
+
+
+def parse_all_qc_tasks(qc_json) -> dict[str, dict]:
+ """Parse every QC task from a QC JSON file.
+
+ Returns a dict mapping task name -> config dict (same shape as
+ ``parse_qc_config``). Returns an empty dict on any error.
+ """
+ qc_json_path = Path(qc_json) if qc_json else None
+ try:
+ raw_text = qc_json_path.read_text()
+ qcconf = QCConfig.model_validate_json(raw_text)
+ except Exception:
+ return {}
+ return {
+ task_name: {
+ "base_mri_image_path": qctask.base_mri_image_path,
+ "overlay_mri_image_path": qctask.overlay_mri_image_path,
+ "svg_montage_path": qctask.svg_montage_path,
+ "iqm_path": qctask.iqm_path,
+ "mesh_paths": qctask.mesh_paths,
+ }
+ for task_name, qctask in qcconf.root.items()
+ }
def parse_qc_config(qc_json, qc_task) -> dict:
- """
- Parse a QC JSON file using the QCConfig Pydantic model.
-
- Returns a dict with keys:
- - 'base_mri_image_path': Path | None
- - 'overlay_mri_image_path': Path | None
- - 'svg_montage_path': list[Path] | None
- - 'iqm_path': list[Path] | None
-
- If the file is missing, invalid, or the requested qc_task is not present,
- all values will be None.
- """
- qc_json_path = Path(qc_json) if qc_json else None
-
- try:
- raw_text = qc_json_path.read_text()
- qcconf = QCConfig.model_validate_json(raw_text)
- except Exception:
- return {
- "base_mri_image_path": None,
- "overlay_mri_image_path": None,
- "svg_montage_path": None,
- "iqm_path": None,
- }
-
- qctask = qcconf.root.get(qc_task)
- if not qctask:
- return {
- "base_mri_image_path": None,
- "overlay_mri_image_path": None,
- "svg_montage_path": None,
- "iqm_path": None,
- }
-
- return {
- "base_mri_image_path": qctask.base_mri_image_path,
- "overlay_mri_image_path": qctask.overlay_mri_image_path,
- "svg_montage_path": qctask.svg_montage_path,
- "iqm_path": qctask.iqm_path,
- }
+ """
+ Parse a QC JSON file using the QCConfig Pydantic model.
+
+ Returns a dict with keys:
+ - 'base_mri_image_path': Path | None
+ - 'overlay_mri_image_path': Path | None
+ - 'svg_montage_path': list[Path] | None
+ - 'iqm_path': list[Path] | None
+ - 'mesh_path': list[Path] | None
+
+ If the file is missing, invalid, or the requested qc_task is not present,
+ all values will be None.
+ """
+ qc_json_path = Path(qc_json) if qc_json else None
+
+ try:
+ # Pydantic v2 deprecates `parse_file`; read file and validate JSON string.
+ raw_text = qc_json_path.read_text()
+ qcconf = QCConfig.model_validate_json(raw_text)
+ except Exception:
+ return {
+ "base_mri_image_path": None,
+ "overlay_mri_image_path": None,
+ "svg_montage_path": None,
+ "iqm_path": None,
+ "mesh_paths": None,
+ }
+
+ qctask = qcconf.root.get(qc_task)
+ if not qctask:
+ return {
+ "base_mri_image_path": None,
+ "overlay_mri_image_path": None,
+ "svg_montage_path": None,
+ "iqm_path": None,
+ "mesh_paths": None,
+ }
+
+ # qctask is a QCTask model; its fields are Path or None already
+ return {
+ "base_mri_image_path": qctask.base_mri_image_path,
+ "overlay_mri_image_path": qctask.overlay_mri_image_path,
+ "svg_montage_path": qctask.svg_montage_path,
+ "iqm_path": qctask.iqm_path,
+ "mesh_paths": qctask.mesh_paths,
+ }
def load_mri_data(path_dict: dict) -> dict:
- """Load base and overlay MRI image files as bytes."""
- base_mri_path = path_dict.get("base_mri_image_path")
- overlay_mri_path = path_dict.get("overlay_mri_image_path")
+ """Load base and overlay MRI image files as bytes."""
+ base_mri_path = path_dict.get("base_mri_image_path")
+ overlay_mri_path = path_dict.get("overlay_mri_image_path")
- file_bytes_dict = {}
+ file_bytes_dict = {}
- if base_mri_path and Path(base_mri_path).is_file():
- file_bytes_dict["base_mri_image_bytes"] = Path(base_mri_path).read_bytes()
+ if base_mri_path and Path(base_mri_path).is_file():
+ file_bytes_dict["base_mri_image_bytes"] = Path(base_mri_path).read_bytes()
- if overlay_mri_path and Path(overlay_mri_path).is_file():
- file_bytes_dict["overlay_mri_image_bytes"] = Path(overlay_mri_path).read_bytes()
+ if overlay_mri_path and Path(overlay_mri_path).is_file():
+ file_bytes_dict["overlay_mri_image_bytes"] = Path(overlay_mri_path).read_bytes()
- return file_bytes_dict
+ return file_bytes_dict
-def load_svg_data(path_dict: dict) -> list[str]:
- """
- Load SVG montage file(s) content as strings.
- Returns a list (possibly empty).
- """
- svg_paths = path_dict.get("svg_montage_path") or []
- out = []
+def load_mesh_data(path_dict: dict) -> list:
+ """Load FreeSurfer surface mesh files as bytes for niivue_viewer."""
+ mesh_paths = path_dict.get("mesh_paths")
+ if not mesh_paths:
+ return []
+ meshes = []
+ for mesh_path in mesh_paths:
+ p = Path(mesh_path)
+ if p.is_file():
+ meshes.append({"data": p.read_bytes(), "name": p.name})
+ return meshes
- for p in svg_paths:
- p = Path(p)
- if p.is_file():
- try:
- out.append(p.read_text())
- except Exception:
- pass
- return out
+def load_svg_data(path_dict: dict) -> str | None:
+ """Load SVG montage file content as string."""
+ svg_montage_path = path_dict.get("svg_montage_path")
+ if svg_montage_path and svg_montage_path.is_file():
+ try:
+ with open(svg_montage_path, "r") as f:
+ return f.read()
+ except Exception:
+ return None
+ return None
def load_iqm_data(path_dict: dict):
- """
- Load IQM files.
- - TSV files are returned as pandas DataFrames
- - JSON files are returned as dicts
-
- Returns a list of loaded objects (possibly empty).
- """
- iqm_paths = path_dict.get("iqm_path") or []
- out = []
-
- for p in iqm_paths:
- p = Path(p)
- if not p.is_file():
- continue
-
- suffix = p.suffix.lower()
-
- if suffix == ".tsv":
- try:
- out.append(pd.read_csv(p, sep="\t"))
- except Exception:
- pass
- elif suffix == ".json":
- try:
- out.append(json.loads(p.read_text()))
- except Exception:
- pass
- else:
- try:
- out.append(p.read_text())
- except Exception:
- pass
-
- return out
+ """
+ Load IQM files.
+ - TSV files are returned as pandas DataFrames
+ - JSON files are returned as dicts
+
+ Returns a list of loaded objects (possibly empty).
+ """
+ iqm_paths = path_dict.get("iqm_path") or []
+ out = []
+
+ for p in iqm_paths:
+ p = Path(p)
+ if not p.is_file():
+ continue
+
+ suffix = p.suffix.lower()
+
+ if suffix == ".tsv":
+ try:
+ out.append(pd.read_csv(p, sep="\t"))
+ except Exception:
+ pass
+ elif suffix == ".json":
+ try:
+ out.append(json.loads(p.read_text()))
+ except Exception:
+ pass
+ else:
+ try:
+ out.append(p.read_text())
+ except Exception:
+ pass
+
+ return out
def save_qc_results_to_csv(out_file, qc_records):
-
- """
- Save QC results to a CSV/TSV file. Accepts QCRecord objects or dicts.
- Overwrites rows by identity keys.
-
- Output columns:
- qc_task, participant_id, session_id, task_id, run_id, pipeline,
- timestamp, rater_id, rater_experience, rater_fatigue, final_qc, notes
- """
- out_file = Path(out_file)
- out_file.parent.mkdir(parents=True, exist_ok=True)
-
- rows = []
-
- for rec in qc_records:
- if hasattr(rec, "model_dump"):
- rec_dict = rec.model_dump()
- elif hasattr(rec, "dict"):
- rec_dict = rec.dict()
- elif isinstance(rec, dict):
- rec_dict = rec
- else:
- continue
-
- rows.append({col: rec_dict.get(col) for col in QCRecord.csv_columns()})
-
- df_new = pd.DataFrame(rows)
-
- if out_file.exists():
- try:
- df_old = pd.read_csv(out_file, sep="\t")
- except Exception:
- df_old = pd.DataFrame()
-
- if not df_old.empty:
- df = pd.concat([df_old, df_new], ignore_index=True)
- else:
- df = df_new
- else:
- df = df_new
-
- key_cols = QCRecord.key_columns()
-
- existing_keys = [c for c in key_cols if c in df.columns]
- if existing_keys:
- df = df.drop_duplicates(subset=existing_keys, keep="last")
-
- if "participant_id" in df.columns:
- df = df.sort_values(by=["participant_id"]).reset_index(drop=True)
- df = df.reindex(columns=QCRecord.csv_columns())
-
- df.to_csv(out_file, index=False, sep="\t")
- return out_file
\ No newline at end of file
+ """
+ Save QC results to a CSV/TSV file. Accepts QCRecord objects or dicts.
+ Overwrites rows by identity keys.
+
+ Output columns:
+ qc_task, participant_id, session_id, task_id, run_id, pipeline,
+ timestamp, rater_id, rater_experience, rater_fatigue, final_qc, notes
+ """
+ out_file = Path(out_file)
+ out_file.parent.mkdir(parents=True, exist_ok=True)
+
+ rows = []
+
+ for rec in qc_records:
+ # support both model instances and plain dicts
+ if hasattr(rec, "model_dump"):
+ # pydantic v2 model -> convert to dict for uniform access
+ rec_dict = rec.model_dump()
+ elif hasattr(rec, "dict"):
+ # pydantic v1 fallback
+ rec_dict = rec.dict()
+ elif isinstance(rec, dict):
+ rec_dict = rec
+ else:
+ # Handle this better with exceptions
+ print("Unknown record format")
+
+ row = {
+ "qc_task": rec_dict.get("qc_task"),
+ "participant_id": rec_dict.get("participant_id"),
+ "session_id": rec_dict.get("session_id"),
+ "task_id": rec_dict.get("task_id"),
+ "run_id": rec_dict.get("run_id"),
+ "pipeline": rec_dict.get("pipeline"),
+ "timestamp": rec_dict.get("timestamp"),
+ "rater_id": rec_dict.get("rater_id"),
+ "rater_experience": rec_dict.get("rater_experience"),
+ "rater_fatigue": rec_dict.get("rater_fatigue"),
+ "final_qc": rec_dict.get("final_qc"),
+ "notes": rec_dict.get("notes"),
+ }
+ rows.append(row)
+
+ df = pd.DataFrame(rows)
+ if out_file.exists():
+ df_existing = pd.read_csv(out_file, sep="\t")
+ df = pd.concat([df_existing, df], ignore_index=True)
+ # Drop duplicates based on core identity columns
+ subset_keys = ["participant_id", "session_id", "pipeline", "qc_task"]
+ existing_keys = [k for k in subset_keys if k in df.columns]
+ if existing_keys:
+ df = df.drop_duplicates(subset=existing_keys, keep="last")
+
+ if "participant_id" in df.columns:
+ df = df.sort_values(by=["participant_id"]).reset_index(drop=True)
+ df = df.reindex(columns=QCRecord.csv_columns())
+
+ df.to_csv(out_file, index=False, sep="\t")
+ return out_file