Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ repos:
rev: v2.4.1
hooks:
- id: codespell
exclude: ^(dandi/_version\.py|dandi/due\.py|versioneer\.py|pyproject\.toml)$
exclude: ^(dandi/_version\.py|dandi/due\.py|versioneer\.py|pyproject\.toml|dandi/data/allen_ccf_structures\.json)$
additional_dependencies:
- tomli; python_version<'3.11'
- repo: https://github.com/PyCQA/flake8
Expand Down
Empty file added dandi/data/__init__.py
Empty file.
1 change: 1 addition & 0 deletions dandi/data/allen_ccf_structures.json

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions dandi/data/generate_allen_structures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""Regenerate allen_ccf_structures.json from Allen Brain Map API.

Run: python -m dandi.data.generate_allen_structures
"""

from __future__ import annotations

import json
from pathlib import Path

import requests


def _flatten(node: dict, out: list[dict]) -> None:
out.append({"id": node["id"], "acronym": node["acronym"], "name": node["name"]})
for child in node.get("children", []):
_flatten(child, out)


def main() -> None:
url = "http://api.brain-map.org/api/v2/structure_graph_download/1.json"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
structures: list[dict] = []
root = data["msg"][0]
_flatten(root, structures)
structures.sort(key=lambda s: s["id"])
out_path = Path(__file__).with_name("allen_ccf_structures.json")
with open(out_path, "w") as f:
json.dump(structures, f, separators=(",", ":"))
print(f"Wrote {len(structures)} structures to {out_path}")


if __name__ == "__main__":
main()
252 changes: 252 additions & 0 deletions dandi/metadata/brain_areas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
from __future__ import annotations

import ast
from functools import lru_cache
import json
from pathlib import Path
import re
from typing import Any

from dandischema import models

from .. import get_logger

lgr = get_logger()

MBAO_URI_TEMPLATE = "http://purl.obolibrary.org/obo/MBA_{}"

# Values that should be treated as missing / uninformative
_TRIVIAL_VALUES = frozenset(
{
"",
"unknown",
"none",
"n/a",
"na",
"null",
"unspecified",
"not available",
"not applicable",
"brain",
}
)


@lru_cache(maxsize=1)
def _load_allen_structures() -> list[dict[str, Any]]:
"""Load the bundled Allen CCF structures JSON."""
data_path = (
Path(__file__).resolve().parent.parent / "data" / "allen_ccf_structures.json"
)
with open(data_path) as f:
structures: list[dict[str, Any]] = json.load(f)
return structures


@lru_cache(maxsize=1)
def _build_lookup_dicts() -> (
tuple[dict[str, dict], dict[str, dict], dict[str, dict], dict[str, dict]]
):
"""Build lookup dictionaries for Allen CCF structures.

Returns
-------
tuple of 4 dicts
(acronym_exact, acronym_lower, name_exact, name_lower)
"""
structures = _load_allen_structures()
acronym_exact: dict[str, dict] = {}
acronym_lower: dict[str, dict] = {}
name_exact: dict[str, dict] = {}
name_lower: dict[str, dict] = {}
for s in structures:
acr = s["acronym"]
name = s["name"]
# First match wins (structures are sorted by id)
if acr not in acronym_exact:
acronym_exact[acr] = s
acr_low = acr.lower()
if acr_low not in acronym_lower:
acronym_lower[acr_low] = s
if name not in name_exact:
name_exact[name] = s
name_low = name.lower()
if name_low not in name_lower:
name_lower[name_low] = s
return acronym_exact, acronym_lower, name_exact, name_lower


def _is_numeric(val: str) -> bool:
"""Return True if *val* looks like a number (int or float)."""
try:
float(val)
return True
except ValueError:
return False


# Canonicalised area key names recognised in dict-style location strings.
# Keys are normalised by lowering, stripping whitespace, and removing hyphens
# and underscores so that "brain-area", "brain_area", "BrainArea" all match.
_AREA_KEYS = frozenset(
{
"area",
"areaname",
"brainarea",
"brainregion",
"location",
"name",
"region",
"regionname",
}
)


def _normalise_key(key: str) -> str:
"""Lower-case and strip spaces, hyphens, underscores from *key*."""
return re.sub(r"[\s_-]", "", key).lower()


def _extract_area_from_dict(d: dict) -> str | None:
"""Return the first non-trivial area value from a dict with flexible key matching."""
for key, val in d.items():
if _normalise_key(str(key)) in _AREA_KEYS:
val = str(val).strip()
if val and val.lower() not in _TRIVIAL_VALUES:
return val
return None


def _parse_location_string(location: str) -> list[str]:
"""Parse a raw NWB location string into area tokens ignoring numerics etc.

Handles:
- Simple strings: ``"VISp"``
- Dict literals: ``"{'area': 'VISp', 'depth': '20'}"``
- Key-value pairs: ``"area: VISp, depth: 175"``
- Comma-separated lists: ``"VISp,VISrl,VISlm"``

In examples above, depth numerical values are getting ignored.
"""
location = location.strip()
if not location or location.lower() in _TRIVIAL_VALUES:
return []

# Try dict literal (e.g. "{'area': 'VISp', 'depth': 20}")
if location.startswith("{"):
try:
d = ast.literal_eval(location)
if isinstance(d, dict):
val = _extract_area_from_dict(d)
if val is not None:
return [val]
# If no known key, return all non-trivial, non-numeric values
tokens = []
for v in d.values():
v = str(v).strip()
if v and v.lower() not in _TRIVIAL_VALUES and not _is_numeric(v):
tokens.append(v)
return tokens
except (ValueError, SyntaxError):

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 5 days ago

In general, to fix an “empty except” issue, you either (a) narrow the exception scope and add at least logging or a comment explaining why it is safe to ignore, or (b) re-raise or otherwise react to the error instead of silently discarding it. Here the intended behavior is to ignore dict-literal parse failures and let later heuristics run, so the best fix is to keep catching ValueError/SyntaxError but log the failure at a low level (e.g., debug) and/or document it.

Concretely, in dandi/metadata/brain_areas.py around lines 91–115, replace the bare except (ValueError, SyntaxError): pass with an except block that logs the exception using the existing module logger lgr (already defined using get_logger()) and then continues. This does not change the external behavior (the function still just falls through to the subsequent parsing logic) but addresses the CodeQL concern by making the handling explicit and non-empty. No new imports are needed since lgr is already in scope.

Suggested changeset 1
dandi/metadata/brain_areas.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/dandi/metadata/brain_areas.py b/dandi/metadata/brain_areas.py
--- a/dandi/metadata/brain_areas.py
+++ b/dandi/metadata/brain_areas.py
@@ -112,7 +112,11 @@
                             tokens.append(val)
                 return tokens
         except (ValueError, SyntaxError):
-            pass
+            lgr.debug(
+                "Failed to parse brain location as dict literal %r; "
+                "falling back to alternative parsing strategies",
+                location,
+            )
 
     # Try key-value format (e.g. "area: VISp, depth: 175")
     if re.search(r"\w+\s*:", location) and "://" not in location:
EOF
@@ -112,7 +112,11 @@
tokens.append(val)
return tokens
except (ValueError, SyntaxError):
pass
lgr.debug(
"Failed to parse brain location as dict literal %r; "
"falling back to alternative parsing strategies",
location,
)

# Try key-value format (e.g. "area: VISp, depth: 175")
if re.search(r"\w+\s*:", location) and "://" not in location:
Copilot is powered by AI and may make mistakes. Always verify output.
Unable to commit as this autofix suggestion is now outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bendichter something should be at least logged here or best to give example on when is it happening -- is that a broken structure from allen (report upstream?) or expected and unhandled explicitly above ? may be it would signal need to just add one more _TRIVIAL_VALUES ?

overall -- unclear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to try our best to process the variety of different forms of the location string.

I disagree with your reviewer bot- there is a clear explanation on the next line.

Above, Claude is trying to parse the string as a dict literal. If it can't it proceeds to other parsers. It is using try/except here instead of if/else. It's easier to try to parse a string as a dict literal and handle failure than it is to have a test for a dict literal. It's not how I usually code, but it's valid and understandable. Would you rather we always used if/else for flow control?

False positives are not a problem, because in the end all tokens will be matched against the CCF.

If you want, we can get rid of this and just reduce the variety of edge cases we can handle here. It just means we will miss area extraction on a few outlier datasets.

At the same time, I am working on a PR to NWB Inspector to try to clamp down on this variety.

lgr.debug("Location %r looks like a dict but failed to parse", location)

# Try key-value format (e.g. "area: VISp, depth: 175")
if re.search(r"\w+\s*:", location) and "://" not in location:
pairs = re.split(r",\s*", location)
kv: dict[str, str] = {}
for pair in pairs:
m = re.match(r"(\w+)\s*:\s*(.+)", pair.strip())
if m:
kv[m.group(1).lower()] = m.group(2).strip()
if kv:
val = _extract_area_from_dict(kv)
if val is not None:
return [val]
# Fall through — return non-trivial, non-numeric values
tokens = []
for v in kv.values():
if v.lower() not in _TRIVIAL_VALUES and not _is_numeric(v):
tokens.append(v)
if tokens:
return tokens

# Comma-separated list
if "," in location:
tokens = [t.strip() for t in location.split(",")]
return [t for t in tokens if t and t.lower() not in _TRIVIAL_VALUES]

# Simple string
return [location]


def match_location_to_allen(token: str) -> models.Anatomy | None:
"""Match a single location token against Allen CCF structures.

Tries exact acronym, case-insensitive acronym, exact name,
case-insensitive name in that order.

Returns
-------
models.Anatomy or None
"""
acronym_exact, acronym_lower, name_exact, name_lower = _build_lookup_dicts()
token_stripped = token.strip()
if not token_stripped:
return None

# 1. Exact acronym match
s = acronym_exact.get(token_stripped)
if s is not None:
return _structure_to_anatomy(s)

# 2. Case-insensitive acronym match
s = acronym_lower.get(token_stripped.lower())
if s is not None:
return _structure_to_anatomy(s)

# 3. Exact name match
s = name_exact.get(token_stripped)
if s is not None:
return _structure_to_anatomy(s)

# 4. Case-insensitive name match
s = name_lower.get(token_stripped.lower())
if s is not None:
return _structure_to_anatomy(s)

lgr.debug("Could not match brain location %r to Allen CCF", token_stripped)
return None


def _structure_to_anatomy(s: dict[str, Any]) -> models.Anatomy:
return models.Anatomy(
identifier=MBAO_URI_TEMPLATE.format(s["id"]),
name=s["name"],
)


def locations_to_anatomy(locations: list[str]) -> list[models.Anatomy]:
"""Convert raw NWB location strings to deduplicated Anatomy list.

Parameters
----------
locations : list[str]
Raw location strings from NWB file.

Returns
-------
list[models.Anatomy]
Matched and deduplicated anatomy entries.
"""
seen_ids: set[str] = set()
results: list[models.Anatomy] = []
for loc in locations:
tokens = _parse_location_string(loc)
for token in tokens:
anatomy = match_location_to_allen(token)
if anatomy is not None:
id_str = str(anatomy.identifier)
if id_str not in seen_ids:
seen_ids.add(id_str)
results.append(anatomy)
return results
59 changes: 56 additions & 3 deletions dandi/metadata/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ def extract_assay_type(metadata: dict) -> list[models.AssayType] | None:


def extract_anatomy(metadata: dict) -> list[models.Anatomy] | None:
# Anatomy is now populated via wasDerivedFrom BioSample.
# This extractor is kept only as a fallback for the top-level "anatomy" field.
if "anatomy" in metadata:
return [models.Anatomy(identifier="anatomy", name=metadata["anatomy"])]
else:
Expand Down Expand Up @@ -552,22 +554,73 @@ def func(metadata: dict) -> list[M]:

def extract_wasDerivedFrom(metadata: dict) -> list[models.BioSample] | None:
derived_from: list[models.BioSample] | None = None
deepest: models.BioSample | None = None
for field, sample_name in [
("tissue_sample_id", "tissuesample"),
("slice_id", "slice"),
("cell_id", "cell"),
]:
if metadata.get(field) is not None:
sample = models.BioSample(
identifier=metadata[field],
wasDerivedFrom=derived_from,
sampleType=models.SampleType(name=sample_name),
)
derived_from = [sample]
if deepest is None:
deepest = sample

# Compute anatomy from brain locations (mouse only)
anatomy = _extract_brain_anatomy(metadata)
if anatomy:
if deepest is not None:
# Add anatomy to the deepest (first created) BioSample
deepest.anatomy = anatomy
else:
# No existing chain — create a new BioSample for the anatomy
derived_from = [
models.BioSample(
identifier=metadata[field],
wasDerivedFrom=derived_from,
sampleType=models.SampleType(name=sample_name),
identifier="brain-region-sample",
sampleType=models.SampleType(name="tissuesample"),
anatomy=anatomy,
)
]

return derived_from


def _is_mouse(metadata: dict) -> bool:
"""Return True if the species in *metadata* is mouse.

Reuses the canonical mouse entry from *species_map* to match against
common names, the Latin prefix, and the NCBITaxon identifier.
"""
if not (species := metadata.get("species")):
return False
species_str = str(species).lower()
# species_map[0] = (["mouse"], "mus", NCBITaxon_10090 URI, full name)
common_names, prefix, uri, _name = species_map[0]
return (
"10090" in species_str
or (prefix is not None and species_str.startswith(prefix))
or any(name in species_str for name in common_names)
or uri.lower() in species_str
)


def _extract_brain_anatomy(metadata: dict) -> list[models.Anatomy]:
"""Extract brain anatomy from metadata, if the species is mouse."""
from .brain_areas import locations_to_anatomy

if not (locations := metadata.get("brain_locations")):
return []

if not _is_mouse(metadata):
return []

return locations_to_anatomy(locations)


extract_wasAttributedTo = extract_model_list(
models.Participant, "identifier", "subject_id", id=None
)
Expand Down
Loading
Loading