Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: CI

on:
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: astral-sh/setup-uv@v5

Comment on lines +8 to +14
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow doesn’t pin a Python version. Since ubuntu-latest can change its default Python over time, CI results may unexpectedly change or fail even if the project hasn’t. Consider explicitly selecting a Python version (or a test matrix) consistent with requires-python.

Copilot uses AI. Check for mistakes.
- name: Install Pillow build dependencies
run: |
sudo apt-get update
sudo apt-get install -y python3-dev build-essential libjpeg-dev zlib1g-dev

- run: uv sync --group dev

- run: uv run pytest
Comment on lines +20 to +22
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI runs uv sync --group dev without enforcing that uv.lock is up-to-date (e.g., via --locked/--frozen). That means dependency drift can slip in (pyproject changes not reflected in the lockfile) while CI still passes. Consider making CI fail when the lockfile is missing/out-of-sync.

Copilot uses AI. Check for mistakes.
65 changes: 65 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

This is the **SMART Integrate Connector SDK** (`cdip_connector`), a Python library for building wildlife/conservation data integration connectors that pull data from external sources and push it to the CDIP (Conservation Data Integration Platform) Sensors API. The project is part of the Gundi ecosystem and is in maintenance/deprecated status (superseded by [gundi-integration-action-runner](https://github.com/PADAS/gundi-integration-action-runner)).

## Build & Development

Package manager: **uv** (pyproject.toml with hatchling build backend, uv.lock present)

```bash
# Install dependencies
uv sync

# Install with dev dependencies
uv sync --group dev

# Run tests
uv run pytest tests/

# Run a single test
uv run pytest tests/test_portalapi.py::TestPortalApi::test_create

# Lint
uv run pylint cdip_connector/

# Format
uv run black cdip_connector/
```

Python requirement: `>=3.8`. The .venv uses Python 3.10.

## Architecture

### Core Library (`cdip_connector/core/`)

- **`connector_base.py`** — The central module. Contains `AbstractConnector` which all connectors subclass. Implements the ETL pipeline: `execute()` → `main()` → `extract_load()` → `extract()` (abstract) + `load()`. Also contains job partitioning logic (`calculate_partition`, `filter_items_for_task`) for running parallel instances via Kubernetes CronJobs or Cloud Run Jobs.
- **`cdip_settings.py`** — All configuration via environment variables using `environs`. Includes Keycloak auth settings, API endpoints, GCP settings, and job partitioning (JOB_COMPLETION_INDEX/COUNT). Can load from a custom env file via `CDIP_SDK_ENVFILE`.
- **`schemas/__init__.py`** — Re-exports all schemas from `gundi_core.schemas` for backward compatibility.
- **`routing.py`** — Pub/Sub topic name definitions for the observation processing pipeline.
- **`cloudstorage.py`** — Abstract `CloudStorage` with `GoogleCloudStorage` and `LocalStorage` implementations for camera trap image handling.
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLAUDE.md references a GoogleCloudStorage implementation, but the current class in cdip_connector/core/cloudstorage.py is named GoogleCouldStorage (note the missing “l”). This mismatch can mislead contributors; either update the doc to match the actual symbol name or rename the class for consistency.

Suggested change
- **`cloudstorage.py`** — Abstract `CloudStorage` with `GoogleCloudStorage` and `LocalStorage` implementations for camera trap image handling.
- **`cloudstorage.py`** — Abstract `CloudStorage` with `GoogleCouldStorage` and `LocalStorage` implementations for camera trap image handling.

Copilot uses AI. Check for mistakes.
- **`tracing.py`** — OpenTelemetry distributed tracing setup with GCP Cloud Trace export.
- **`logconfig.py`** — JSON-formatted logging configuration. Can be overridden by creating a `local_log_settings.py` module.

### Connector Pattern

Connectors subclass `AbstractConnector` and implement `extract()` as an async generator yielding lists of data records. See `connector_skeleton.py` for the template. The base class handles:
1. Fetching authorized integrations from the Gundi Portal API
2. Partitioning integrations across parallel job instances (hash-based on integration UUID)
3. Batched HTTP POST to the Sensors API with exponential backoff retry
4. State management via `update_state()`

### Key External Dependencies

- **`gundi-client`** — `PortalApi` for portal interactions (auth, integrations, device states)
- **`gundi-core`** — Shared schemas (`IntegrationInformation`, `CDIPBaseModel`, `Position`, etc.)
- **`google-cloud-pubsub`** / **`google-cloud-storage`** — GCP integrations
- **`httpx`** — Async HTTP client for Sensors API calls
- **`backoff`** — Retry logic on HTTP errors

### Job Partitioning

When `JOB_COMPLETION_INDEX` and `JOB_COMPLETION_COUNT` are set (via Kubernetes or Cloud Run), integrations are distributed across job instances using SHA1 hash of the integration UUID modulo partition count. This is configured in `cdip_settings.py` and applied in `connector_base.py:filter_items_for_task()`.
11 changes: 4 additions & 7 deletions cdip_connector/core/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from abc import ABC, abstractmethod
import uuid
from typing import List, AsyncGenerator, Dict, Any
from typing import List, AsyncGenerator, Dict, Any, Optional
import httpx
from cdip_connector.core import cdip_settings
from cdip_connector.core import logconfig
Expand Down Expand Up @@ -41,12 +41,9 @@ class AbstractConnector(ABC):
DEFAULT_LOOKBACK_DAYS = cdip_settings.DEFAULT_LOOKBACK_DAYS
DEFAULT_REQUESTS_TIMEOUT = (3.1, 20)

def __init__(self):

def __init__(self, portal: Optional[PortalApi] = None):
self.logger = logging.getLogger(self.__class__.__name__)

self.portal = PortalApi(data_timeout=cdip_settings.DEFAULT_DATA_TIMEOUT_SECONDS)

self.portal = portal if portal is not None else PortalApi(data_timeout=cdip_settings.DEFAULT_DATA_TIMEOUT_SECONDS)
self.load_batch_size = cdip_settings.INTEGRATION_LOAD_BATCH_SIZE
self.concurrency = cdip_settings.INTEGRATION_CONCURRENCY

Expand All @@ -70,7 +67,7 @@ async def main(self) -> None:
batch = integrations[idx: idx + self.concurrency]
batch_ids = [str(i.id) for i in batch]
tasks = [
asyncio.ensure_future(self.__class__().extract_load(integration))
asyncio.ensure_future(self.__class__(portal=self.portal).extract_load(integration))
for integration in batch
]
try:
Expand Down
Loading
Loading