Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/).
- DID document service entry updated from `#atproto_appview` / `AtprotoAppView` to `#atdata_appview` / `AtdataAppView`

### Added
- Adversarial review: sendInteractions feature and surrounding code (round 3) (#39)
- Add sendInteractions XRPC procedure for usage telemetry (#35)

- Dual-hostname DID document support — serve different `did:web` documents for `api.atdata.app` (appview identity) and `atdata.app` (atproto account identity) based on the `Host` header ([#19](https://github.com/forecast-bio/atdata-app/issues/19))
- Host-based route gating middleware — frontend HTML routes are only served on the frontend hostname; the API subdomain serves only XRPC, health, and DID endpoints
Expand Down
28 changes: 17 additions & 11 deletions src/atdata_app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,17 @@ async def query_search_lenses(
)


async def _fetch_record_counts(conn: asyncpg.Connection) -> dict[str, int]:
counts = {}
for collection, table in COLLECTION_TABLE_MAP.items():
row = await conn.fetchrow(f"SELECT COUNT(*) as cnt FROM {table}") # noqa: S608
counts[collection] = row["cnt"]
return counts


async def query_record_counts(pool: asyncpg.Pool) -> dict[str, int]:
async with pool.acquire() as conn:
counts = {}
for collection, table in COLLECTION_TABLE_MAP.items():
row = await conn.fetchrow(f"SELECT COUNT(*) as cnt FROM {table}") # noqa: S608
counts[collection] = row["cnt"]
return counts
return await _fetch_record_counts(conn)


async def query_labels_for_dataset(
Expand Down Expand Up @@ -692,11 +696,7 @@ async def query_analytics_summary(
{"term": r["term"], "count": r["count"]} for r in term_rows
]

# Record counts
counts = {}
for collection, table in COLLECTION_TABLE_MAP.items():
c = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}") # noqa: S608
counts[collection] = c["cnt"]
counts = await _fetch_record_counts(conn)

return {
"totalViews": total_views,
Expand All @@ -720,7 +720,10 @@ async def query_entry_stats(
"""
SELECT
COUNT(*) FILTER (WHERE event_type = 'view_entry') AS views,
COUNT(*) FILTER (WHERE event_type = 'search') AS search_appearances
COUNT(*) FILTER (WHERE event_type = 'search') AS search_appearances,
COUNT(*) FILTER (WHERE event_type = 'download') AS downloads,
COUNT(*) FILTER (WHERE event_type = 'citation') AS citations,
COUNT(*) FILTER (WHERE event_type = 'derivative') AS derivatives
FROM analytics_events
WHERE target_did = $1 AND target_rkey = $2
AND created_at >= NOW() - $3::interval
Expand All @@ -732,6 +735,9 @@ async def query_entry_stats(
return {
"views": row["views"],
"searchAppearances": row["search_appearances"],
"downloads": row["downloads"],
"citations": row["citations"],
"derivatives": row["derivatives"],
"period": period,
}

Expand Down
10 changes: 6 additions & 4 deletions src/atdata_app/frontend/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from atdata_app.models import (
maybe_cursor,
parse_at_uri,
parse_cursor,
row_to_entry,
row_to_label,
Expand Down Expand Up @@ -100,10 +101,11 @@ async def dataset_detail(request: Request, did: str, rkey: str):
schema_did = ""
schema_rkey = ""
schema_ref = entry.get("schemaRef", "")
if schema_ref.startswith("at://"):
parts = schema_ref[5:].split("/", 2)
if len(parts) == 3:
schema_did, _, schema_rkey = parts
if schema_ref:
try:
schema_did, _, schema_rkey = parse_at_uri(schema_ref)
except ValueError:
pass

# Fetch labels pointing to this dataset
dataset_uri = entry["uri"]
Expand Down
9 changes: 1 addition & 8 deletions src/atdata_app/ingestion/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@ async def process_commit(pool: asyncpg.Pool, event: dict[str, Any]) -> None:
record = commit["record"]
cid = commit.get("cid")
try:
if table == "schemas":
await db.upsert_schema(pool, did, rkey, cid, record)
elif table == "entries":
await db.upsert_entry(pool, did, rkey, cid, record)
elif table == "labels":
await db.upsert_label(pool, did, rkey, cid, record)
elif table == "lenses":
await db.upsert_lens(pool, did, rkey, cid, record)
await db.UPSERT_FNS[table](pool, did, rkey, cid, record)
logger.debug("Upserted %s %s/%s", collection, did, rkey)
except Exception:
logger.exception("Failed to upsert %s %s/%s", collection, did, rkey)
5 changes: 5 additions & 0 deletions src/atdata_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def row_to_label(row) -> dict[str, Any]:
d: dict[str, Any] = {
"uri": uri,
"cid": row["cid"],
"did": row["did"],
"name": row["name"],
"datasetUri": row["dataset_uri"],
"createdAt": row["created_at"],
Expand All @@ -150,6 +151,7 @@ def row_to_lens(row) -> dict[str, Any]:
d: dict[str, Any] = {
"uri": uri,
"cid": row["cid"],
"did": row["did"],
"name": row["name"],
"sourceSchema": row["source_schema"],
"targetSchema": row["target_schema"],
Expand Down Expand Up @@ -236,4 +238,7 @@ class GetAnalyticsResponse(BaseModel):
class GetEntryStatsResponse(BaseModel):
views: int
searchAppearances: int
downloads: int = 0
citations: int = 0
derivatives: int = 0
period: str
79 changes: 79 additions & 0 deletions src/atdata_app/xrpc/procedures.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import logging
from datetime import datetime
from typing import Any

import httpx
Expand All @@ -15,6 +16,7 @@
from atdata_app import get_resolver
from atdata_app.auth import verify_service_auth
from atdata_app.database import (
fire_analytics_event,
query_get_entry,
query_get_schema,
query_record_exists,
Expand Down Expand Up @@ -268,3 +270,80 @@ async def publish_lens(request: Request) -> dict[str, Any]:
pds, pds_token, auth.iss, "science.alt.dataset.lens", record, rkey
)
return {"uri": result.get("uri"), "cid": result.get("cid")}


# ---------------------------------------------------------------------------
# sendInteractions
# ---------------------------------------------------------------------------

_VALID_INTERACTION_TYPES = frozenset({"download", "citation", "derivative"})
_MAX_INTERACTIONS_BATCH = 100


def _validate_iso8601(value: str) -> None:
"""Raise ValueError if *value* is not a valid ISO 8601 datetime string."""
# datetime.fromisoformat handles the common subset we accept
datetime.fromisoformat(value)


@router.post("/science.alt.dataset.sendInteractions")
async def send_interactions(request: Request) -> dict[str, Any]:
pool = request.app.state.db_pool

body = await request.json()
interactions = body.get("interactions")
if not isinstance(interactions, list):
raise HTTPException(status_code=400, detail="interactions must be an array")

if len(interactions) > _MAX_INTERACTIONS_BATCH:
raise HTTPException(
status_code=400,
detail=f"Batch size exceeds maximum of {_MAX_INTERACTIONS_BATCH}",
)

for i, item in enumerate(interactions):
if not isinstance(item, dict):
raise HTTPException(status_code=400, detail=f"interactions[{i}]: must be an object")

itype = item.get("type")
if itype not in _VALID_INTERACTION_TYPES:
raise HTTPException(
status_code=400,
detail=f"interactions[{i}]: invalid type '{itype}', "
f"must be one of: {', '.join(sorted(_VALID_INTERACTION_TYPES))}",
)

dataset_uri = item.get("datasetUri")
if not isinstance(dataset_uri, str):
raise HTTPException(
status_code=400, detail=f"interactions[{i}]: datasetUri is required"
)
try:
parse_at_uri(dataset_uri)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"interactions[{i}]: invalid AT-URI: {dataset_uri}",
)

timestamp = item.get("timestamp")
if timestamp is not None:
if not isinstance(timestamp, str):
raise HTTPException(
status_code=400,
detail=f"interactions[{i}]: timestamp must be a string",
)
try:
_validate_iso8601(timestamp)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"interactions[{i}]: invalid ISO 8601 timestamp: {timestamp}",
)

# All valid — fire analytics events
for item in interactions:
did, _collection, rkey = parse_at_uri(item["datasetUri"])
fire_analytics_event(pool, item["type"], target_did=did, target_rkey=rkey)

return {}
Loading