Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8cc6722
Merge remote-tracking branch 'origin/main' into develop
maxinelevesque Feb 22, 2026
b5dade2
feat: replace empty lexicons/ with atdata-lexicon submodule pinned to…
maxinelevesque Feb 25, 2026
81a832b
Merge pull request #31 from forecast-bio/feature/point-lexicon-at-atd…
maxine-at-forecast Feb 26, 2026
c9108b1
feat: add sendInteractions XRPC procedure for usage telemetry
maxinelevesque Feb 26, 2026
92bb19a
feat: add skeleton/hydration pattern for third-party dataset indexes
maxinelevesque Feb 26, 2026
6e8867d
fix: add missing did field to label/lens serializers, deduplicate rec…
maxinelevesque Feb 26, 2026
bb72c9c
fix: add missing did field to row_to_label/row_to_lens, batch mechani…
maxinelevesque Feb 26, 2026
74eaf45
feat: add subscribeChanges WebSocket endpoint for real-time change st…
maxinelevesque Feb 26, 2026
beb9e63
docs: add subscribeChanges to CHANGELOG
maxinelevesque Feb 26, 2026
37f2240
feat: add array format type recognition and ndarray v1.1.0 annotation…
maxinelevesque Feb 26, 2026
a923e1e
Merge pull request #33 from forecast-bio/feature/usage-telemetry-send…
maxine-at-forecast Feb 26, 2026
9fdd407
Merge remote-tracking branch 'origin/develop' into feature/skeleton-h…
maxinelevesque Feb 26, 2026
6b35a1f
Merge pull request #34 from forecast-bio/feature/skeleton-hydration-g…
maxine-at-forecast Feb 26, 2026
f43eee4
Merge remote-tracking branch 'origin/develop' into feature/realtime-c…
maxinelevesque Feb 26, 2026
3283a22
Merge pull request #35 from forecast-bio/feature/realtime-change-stre…
maxine-at-forecast Feb 26, 2026
e0496be
Merge pull request #36 from forecast-bio/feature/array-format-types-n…
maxine-at-forecast Feb 26, 2026
e1e71c7
Merge branch 'develop' into release/v0.4.0b1
maxinelevesque Feb 26, 2026
60e9b4b
release: prepare v0.4.0b1
maxinelevesque Feb 26, 2026
991e6df
fix: address critical findings from adversarial review
maxinelevesque Feb 26, 2026
1260f6e
fix: address warning-level findings from adversarial review
maxinelevesque Feb 26, 2026
0de1047
docs: update CHANGELOG with security and fix sections from adversaria…
maxinelevesque Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: astral-sh/setup-uv@v5
with:
enable-cache: true
Expand All @@ -28,6 +30,8 @@ jobs:
python-version: ["3.12", "3.13"]
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: astral-sh/setup-uv@v5
with:
enable-cache: true
Expand Down Expand Up @@ -56,6 +60,8 @@ jobs:
--health-retries 5
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Apply schema
env:
PGHOST: localhost
Expand Down Expand Up @@ -105,6 +111,8 @@ jobs:
--health-retries 5
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: astral-sh/setup-uv@v5
with:
enable-cache: true
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ jobs:
id-token: write
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: astral-sh/setup-uv@v5
with:
enable-cache: true
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "lexicons"]
path = lexicons
url = https://github.com/forecast-bio/atdata-lexicon.git
36 changes: 36 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/),
and this project adheres to [Semantic Versioning](https://semver.org/).

## [0.4.0b1] - 2026-02-26

### Added

- `sendInteractions` XRPC procedure for anonymous usage telemetry — fire-and-forget reporting of download, citation, and derivative events on datasets ([#21](https://github.com/forecast-bio/atdata-app/issues/21))
- Skeleton/hydration pattern for third-party dataset indexes — `getIndexSkeleton`, `getIndex`, `listIndexes`, and `publishIndex` endpoints following Bluesky's feed generator model ([#20](https://github.com/forecast-bio/atdata-app/issues/20))
- `subscribeChanges` WebSocket endpoint for real-time change streaming — in-memory event bus broadcasts create/update/delete events to subscribers with cursor-based replay ([#22](https://github.com/forecast-bio/atdata-app/issues/22))
- Array format type recognition (`sparseBytes`, `structuredBytes`, `arrowTensor`, `safetensors`) and ndarray v1.1.0 annotation display (`dtype`, `shape`, `dimensionNames`) in frontend templates ([#30](https://github.com/forecast-bio/atdata-app/issues/30))
- `atdata-lexicon` git submodule at `lexicons/` pinned to v0.2.1b1 for reference and CI validation ([#27](https://github.com/forecast-bio/atdata-app/issues/27))
- CI checkout steps now initialize submodules

### Changed

- Ingestion processor refactored to use `UPSERT_FNS` dispatch dict instead of if/elif chain
- Index provider records (`science.alt.dataset.index`) added to `COLLECTION_TABLE_MAP` for firehose ingestion

### Security

- **SSRF protection**: Skeleton fetch now validates endpoint URLs with DNS resolution and blocks private/reserved IP ranges at both fetch time and firehose ingestion time
- **Auth**: `sendInteractions` endpoint now requires ATProto service auth (was previously unauthenticated)
- **XSS**: Storage URLs in dataset detail pages are only rendered as clickable links when using `http(s)://` schemes, preventing `javascript:` URI injection
- **Input validation**: `publishIndex` rejects endpoint URLs containing embedded credentials or fragments; `sendInteractions` validates that URIs reference the `science.alt.dataset.entry` collection

### Fixed

- **ChangeStream backpressure**: Subscribers that fall behind are now tracked and explicitly disconnected with WebSocket close code 4000, instead of silently dropping events
- **ChangeStream subscriber limit**: Capped at 1000 concurrent subscribers; new connections receive close code 1013 when full
- **WebSocket keepalive**: Restructured the `subscribeChanges` event loop so the 30-second idle keepalive correctly re-enters the processing loop (was previously broken)
- **Replay deduplication**: Track last replayed sequence number to prevent duplicate events when replay buffer overlaps with the live queue
- **Task GC**: Fire-and-forget analytics tasks now retain references to prevent garbage collection before completion
- **Skeleton response cap**: Enforce the requested `limit` on items returned by external index providers, and cap response body size to 1 MiB
- **Skeleton item sanitization**: Whitelist upstream skeleton items to only the `uri` field; validate cursor strings for length and null bytes
- **Query guard**: `query_get_entries` now rejects requests with more than 100 keys to prevent unbounded OR-clause queries
- **Template robustness**: `shape` and `dimensionNames` join filters now guard against non-iterable data from malformed firehose records
- Removed dead `_validate_iso8601` timestamp validation code from `sendInteractions`

## [0.3.0b1] - 2026-02-22

### Changed
Expand Down
10 changes: 10 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ uv run ruff check src/ tests/
uv run uvicorn atdata_app.main:app --reload
```

## Lexicon Submodule

The `lexicons/` directory is a git submodule pointing to [forecast-bio/atdata-lexicon](https://github.com/forecast-bio/atdata-lexicon), which contains the authoritative `science.alt.dataset.*` lexicon definitions. The submodule is for reference and CI validation — the Python source code does not read lexicon files at runtime.

After cloning, initialize the submodule:

```bash
git submodule update --init
```

## Architecture

### Data Flow
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ ATProto Network
# Install dependencies
uv sync --dev

# Initialize the lexicon submodule
git submodule update --init

# Set up PostgreSQL (schema auto-applies on startup)
createdb atdata_app

Expand Down Expand Up @@ -135,6 +138,16 @@ uv run ruff check src/ tests/

Tests mock all external dependencies (database, HTTP, identity resolution) using `unittest.mock.AsyncMock`. HTTP endpoint tests use httpx `ASGITransport` for in-process testing without a running server.

### Lexicon Definitions

The `lexicons/` directory is a [git submodule](https://github.com/forecast-bio/atdata-lexicon) containing the authoritative `science.alt.dataset.*` lexicon schemas. Initialize it with:

```bash
git submodule update --init
```

The lexicons are for reference and CI validation. The Python source code uses hardcoded NSID constants and does not read the lexicon JSON files at runtime.

## License

MIT
1 change: 1 addition & 0 deletions lexicons
Submodule lexicons added at ece47a
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "atdata-app"
version = "0.3.0b1"
version = "0.4.0b1"
description = "ATProto AppView for science.alt.dataset"
readme = "README.md"
authors = [
Expand Down
167 changes: 167 additions & 0 deletions src/atdata_app/changestream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""In-memory broadcast channel for real-time change events.

Provides a pub/sub mechanism that the ingestion processor publishes to
and WebSocket subscribers consume from. Maintains a bounded buffer of
recent events for cursor-based replay.
"""

from __future__ import annotations

import asyncio
import logging
from collections import deque
from dataclasses import dataclass, field
from typing import Any

logger = logging.getLogger(__name__)

DEFAULT_BUFFER_SIZE = 1000
DEFAULT_SUBSCRIBER_QUEUE_SIZE = 256
DEFAULT_MAX_SUBSCRIBERS = 1000


@dataclass
class ChangeEvent:
"""A single change event in the stream."""

seq: int
type: str # "create", "update", or "delete"
collection: str
did: str
rkey: str
timestamp: str
record: dict[str, Any] | None = None
cid: str | None = None

def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"seq": self.seq,
"type": self.type,
"collection": self.collection,
"did": self.did,
"rkey": self.rkey,
"timestamp": self.timestamp,
}
if self.record is not None:
d["record"] = self.record
if self.cid is not None:
d["cid"] = self.cid
return d


@dataclass
class ChangeStream:
"""Broadcast channel with bounded replay buffer.

Thread-safe for asyncio: all mutations happen in the event loop.
"""

buffer_size: int = DEFAULT_BUFFER_SIZE
subscriber_queue_size: int = DEFAULT_SUBSCRIBER_QUEUE_SIZE
max_subscribers: int = DEFAULT_MAX_SUBSCRIBERS
_seq: int = field(default=0, init=False)
_buffer: deque[ChangeEvent] = field(init=False)
_subscribers: dict[int, asyncio.Queue[ChangeEvent]] = field(
default_factory=dict, init=False
)
_dropped_subs: set[int] = field(default_factory=set, init=False)
_next_sub_id: int = field(default=0, init=False)

def __post_init__(self) -> None:
self._buffer = deque(maxlen=self.buffer_size)

def publish(self, event: ChangeEvent) -> None:
"""Publish an event to all subscribers and the replay buffer.

Non-blocking. If a subscriber's queue is full, the subscriber is
marked as dropped so the WebSocket handler can close the connection.
"""
self._seq += 1
event.seq = self._seq
self._buffer.append(event)

for sub_id, queue in list(self._subscribers.items()):
try:
queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning(
"Subscriber %d queue full at seq=%d — marking for disconnect",
sub_id,
event.seq,
)
self._dropped_subs.add(sub_id)

def subscribe(self) -> tuple[int, asyncio.Queue[ChangeEvent]]:
"""Create a new subscriber. Returns (subscriber_id, queue).

Raises ``RuntimeError`` if the maximum subscriber count is reached.
"""
if len(self._subscribers) >= self.max_subscribers:
raise RuntimeError(
f"Maximum subscriber count ({self.max_subscribers}) reached"
)
sub_id = self._next_sub_id
self._next_sub_id += 1
queue: asyncio.Queue[ChangeEvent] = asyncio.Queue(
maxsize=self.subscriber_queue_size
)
self._subscribers[sub_id] = queue
logger.debug("Subscriber %d connected (total: %d)", sub_id, len(self._subscribers))
return sub_id, queue

def unsubscribe(self, sub_id: int) -> None:
"""Remove a subscriber."""
self._subscribers.pop(sub_id, None)
self._dropped_subs.discard(sub_id)
logger.debug("Subscriber %d disconnected (total: %d)", sub_id, len(self._subscribers))

def is_dropped(self, sub_id: int) -> bool:
"""Return True if the subscriber was dropped due to backpressure."""
return sub_id in self._dropped_subs

def replay_from(self, cursor: int) -> list[ChangeEvent]:
"""Return buffered events with seq > cursor.

Returns an empty list if the cursor is outside the buffer window.
"""
if not self._buffer:
return []

oldest_seq = self._buffer[0].seq
if cursor < oldest_seq - 1:
# Cursor is too old — events between cursor and buffer start were lost
return []

return [ev for ev in self._buffer if ev.seq > cursor]

@property
def current_seq(self) -> int:
return self._seq

@property
def subscriber_count(self) -> int:
return len(self._subscribers)


def make_change_event(
*,
event_type: str,
collection: str,
did: str,
rkey: str,
record: dict[str, Any] | None = None,
cid: str | None = None,
) -> ChangeEvent:
"""Factory for creating change events with current timestamp."""
from datetime import datetime, timezone

return ChangeEvent(
seq=0, # Assigned by ChangeStream.publish()
type=event_type,
collection=collection,
did=did,
rkey=rkey,
timestamp=datetime.now(timezone.utc).isoformat(),
record=record,
cid=cid,
)
Loading