Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .atdata-ecosystem.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"role": "sdk",
"language": "python",
"lexicon_sync": {"method": "tarball", "ref": "v1.2.0"},
"capabilities": [
"shard_read_write",
"schema_publish",
"schema_resolve_xrpc",
"dataset_search_xrpc",
"lens_transforms",
"manifest_queries",
"load_dataset_hf_api",
"atmosphere_crud",
"label_resolve_xrpc",
"blob_resolve_xrpc"
]
}
32 changes: 32 additions & 0 deletions .github/workflows/compat-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Lexicon compatibility check.
#
# Triggered by atdata-lexicon's dispatch-consumers workflow when a PR
# proposes lexicon changes. Syncs lexicons to the proposed ref and runs
# the full test suite to detect breaking changes.

name: Lexicon Compat Check

on:
repository_dispatch:
types: [lexicon-compat-check]

jobs:
compat:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v4

- name: Set up Python
run: uv python install

- name: Install dependencies
run: uv sync

- name: Sync lexicons to proposed ref
run: just sync-lexicons ref=${{ github.event.client_payload.lexicon_ref }}

- name: Run tests
run: uv run pytest tests/
21 changes: 18 additions & 3 deletions src/atdata/atmosphere/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,25 @@ def xrpc_procedure(
raise AppViewUnavailableError(self._appview_url, str(exc)) from exc

# ------------------------------------------------------------------ #
# Cross-account reads via bsky.social AppView (existing behavior)
# Cross-account reads via generic AppView (Tier 1)
# ------------------------------------------------------------------ #

_APPVIEW_URL = "https://bsky.social"
_GENERIC_APPVIEW_URL: str | None = None

@classmethod
def _get_generic_appview_url(cls) -> str:
"""Return the generic AppView URL for unauthenticated cross-account reads.

Reads from the ``ATDATA_GENERIC_APPVIEW`` environment variable,
falling back to ``https://bsky.social``.
"""
if cls._GENERIC_APPVIEW_URL is None:
import os

cls._GENERIC_APPVIEW_URL = os.environ.get(
"ATDATA_GENERIC_APPVIEW", "https://bsky.social"
)
return cls._GENERIC_APPVIEW_URL

def _get_appview_client(self) -> Any:
"""Return a shared, unauthenticated client pointed at the public AppView.
Expand All @@ -488,7 +503,7 @@ def _get_appview_client(self) -> Any:
"""
if not hasattr(self, "_appview_client") or self._appview_client is None:
Client = _get_atproto_client_class()
self._appview_client = Client(base_url=self._APPVIEW_URL)
self._appview_client = Client(base_url=self._get_generic_appview_url())
return self._appview_client

# Low-level record operations
Expand Down
101 changes: 101 additions & 0 deletions tests/test_lexicon_drift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Lexicon drift detection tests.

Ensures every property defined in the lexicon JSON files has a corresponding
field in the Python dataclass types in ``_lexicon_types.py``. When a new
property is added to a lexicon but not to the Python type, this test fails.
"""

from __future__ import annotations

import json
import re
from pathlib import Path

import pytest

from atdata.atmosphere._lexicon_types import (
LexDatasetEntry,
LexLabelRecord,
LexLensRecord,
LexLensVerification,
LexSchemaRecord,
)

LEXICON_DIR = (
Path(__file__).resolve().parent.parent
/ "src"
/ "atdata"
/ "lexicons"
/ "science"
/ "alt"
/ "dataset"
)

# Map lexicon JSON file stems to their corresponding Python types.
RECORD_TYPES: dict[str, type] = {
"entry": LexDatasetEntry,
"schema": LexSchemaRecord,
"lens": LexLensRecord,
"label": LexLabelRecord,
"lensVerification": LexLensVerification,
}


def _camel_to_snake(name: str) -> str:
"""Convert camelCase to snake_case."""
s1 = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s1).lower()


def _get_lexicon_properties(lexicon_path: Path) -> set[str]:
"""Extract main record property names from a lexicon JSON file."""
data = json.loads(lexicon_path.read_text())
main_def = data.get("defs", {}).get("main", {})
record = main_def.get("record", {})
return set(record.get("properties", {}).keys())


def _get_dataclass_fields(cls: type) -> set[str]:
"""Get field names from a dataclass, including inherited fields."""
import dataclasses

return {f.name for f in dataclasses.fields(cls)}


def _build_test_cases() -> list[tuple[str, Path, type]]:
"""Build parametrized test cases for each lexicon/type pair."""
cases = []
for stem, py_type in RECORD_TYPES.items():
lexicon_path = LEXICON_DIR / f"{stem}.json"
if lexicon_path.exists():
cases.append((stem, lexicon_path, py_type))
return cases


@pytest.mark.parametrize(
"stem,lexicon_path,py_type",
_build_test_cases(),
ids=[c[0] for c in _build_test_cases()],
)
def test_lexicon_properties_covered_by_python_type(
stem: str, lexicon_path: Path, py_type: type
) -> None:
"""Every lexicon record property must have a Python dataclass field."""
lexicon_props = _get_lexicon_properties(lexicon_path)
python_fields = _get_dataclass_fields(py_type)

# Build mapping: camelCase lexicon prop -> expected snake_case Python field
missing = []
for prop in sorted(lexicon_props):
# Skip $type — it's not a stored field
if prop.startswith("$"):
continue
expected_field = _camel_to_snake(prop)
if expected_field not in python_fields:
missing.append(f" {prop} -> {expected_field}")

assert not missing, (
f"Lexicon '{stem}' has properties not covered by {py_type.__name__}:\n"
+ "\n".join(missing)
+ "\nAdd the missing fields to the Python dataclass."
)
170 changes: 170 additions & 0 deletions tests/test_vectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Cross-repo test vector runner.

Executes shard-roundtrip test vectors from ``test-vectors/shard-roundtrip/``
in the vendored lexicons directory. Each vector specifies a schema, a set of
input samples, and expected outputs. The runner writes samples to a shard,
reads them back, and verifies against expected values.

Skips gracefully if the test-vectors directory does not exist yet.
"""

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import numpy as np
import pytest

import atdata

# Try two locations: sibling test-vectors dir from atdata-lexicon, or vendored
_REPO_ROOT = Path(__file__).resolve().parent.parent
_CANDIDATE_PATHS = [
_REPO_ROOT / "test-vectors" / "shard-roundtrip",
_REPO_ROOT
/ "src"
/ "atdata"
/ "lexicons"
/ ".."
/ ".."
/ ".."
/ ".."
/ "test-vectors"
/ "shard-roundtrip",
]

VECTORS_DIR: Path | None = None
for _p in _CANDIDATE_PATHS:
_resolved = _p.resolve()
if _resolved.is_dir():
VECTORS_DIR = _resolved
break


def _collect_vector_files() -> list[Path]:
"""Collect all vector JSON files, or return empty if dir missing."""
if VECTORS_DIR is None:
return []
return sorted(VECTORS_DIR.glob("*.json"))


def _make_packable_class(
schema_def: dict[str, Any],
) -> type:
"""Dynamically create a packable dataclass from a vector schema definition."""
fields_def = schema_def["fields"]
annotations: dict[str, type] = {}
for f in fields_def:
type_str = f["type"]
if type_str == "str":
annotations[f["name"]] = str
elif type_str == "int":
annotations[f["name"]] = int
elif type_str == "float":
annotations[f["name"]] = float
elif type_str == "dict":
annotations[f["name"]] = dict
elif type_str == "ndarray":
annotations[f["name"]] = np.ndarray
else:
annotations[f["name"]] = Any

ns: dict[str, Any] = {"__annotations__": annotations}
cls = type("VectorSample", (), ns)
cls = dataclass(cls)
cls = atdata.packable(cls)
return cls


def _make_sample(cls: type, sample_data: dict[str, Any], schema_def: dict) -> Any:
"""Create a sample instance from vector data."""
kwargs: dict[str, Any] = {}
field_types = {f["name"]: f for f in schema_def["fields"]}

for fname, fdef in field_types.items():
val = sample_data.get(fname)
if fdef["type"] == "ndarray" and val is not None:
dtype = fdef.get("dtype", "float32")
val = np.array(val, dtype=dtype)
kwargs[fname] = val

kwargs["__key__"] = sample_data["__key__"]
return cls(**kwargs)


def _check_field_value(actual: Any, expected: Any, field_name: str, key: str) -> None:
"""Assert a single field value matches the expected value."""
if isinstance(expected, dict) and "dtype" in expected and "values" in expected:
# NDArray check
assert isinstance(actual, np.ndarray), (
f"Sample {key}.{field_name}: expected ndarray, got {type(actual)}"
)
np.testing.assert_array_almost_equal(
actual,
np.array(expected["values"], dtype=expected["dtype"]),
err_msg=f"Sample {key}.{field_name} values mismatch",
)
assert list(actual.shape) == expected["shape"], (
f"Sample {key}.{field_name} shape mismatch: "
f"{list(actual.shape)} != {expected['shape']}"
)
else:
assert actual == expected, (
f"Sample {key}.{field_name}: {actual!r} != {expected!r}"
)


_vector_files = _collect_vector_files()


@pytest.mark.skipif(
not _vector_files,
reason="No test vectors found (test-vectors/shard-roundtrip/ not present)",
)
@pytest.mark.parametrize(
"vector_path",
_vector_files,
ids=[p.stem for p in _vector_files],
)
def test_shard_roundtrip_vector(vector_path: Path, tmp_path: Path) -> None:
"""Write samples from a test vector to a shard, read back, verify."""
vector = json.loads(vector_path.read_text())

schema_def = vector["inputs"]["schema"]
sample_cls = _make_packable_class(schema_def)

# Create sample instances
samples = [
_make_sample(sample_cls, s, schema_def) for s in vector["inputs"]["samples"]
]

# Write to shard
shard_path = tmp_path / "test-shard-000000.tar"
atdata.write_samples(samples, str(shard_path))

# Read back
ds = atdata.Dataset[sample_cls](str(shard_path))
read_samples = list(ds)

# Verify expected sample count
expected = vector["expected"]
assert len(read_samples) == expected["sample_count"], (
f"Expected {expected['sample_count']} samples, got {len(read_samples)}"
)

# Verify keys
read_keys = [s.__key__ for s in read_samples]
assert read_keys == expected["keys"], (
f"Key mismatch: {read_keys} != {expected['keys']}"
)

# Verify field values
samples_by_key = {s.__key__: s for s in read_samples}
for key, field_checks in expected.get("field_checks", {}).items():
sample = samples_by_key[key]
for field_name, expected_val in field_checks.items():
actual_val = getattr(sample, field_name)
_check_field_value(actual_val, expected_val, field_name, key)
Loading