Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cursorignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
!sh/
!sh/**/*.sh

!planning
!planning/*MD

!.github/**
!.env.example
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ target/
#*/**/*png
*/**/*pdf

site/
site/
planning/
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.5.0] - 2026-02-02

### Added
- **Ingestion date range**: `IngestionParams` supports `datetime_after`, `datetime_before`, and `datetime_column` so ingestion can be restricted to a date range
- Use with `GraphEngine.create_patterns(..., datetime_columns={...})` for per-resource datetime columns, or set `IngestionParams.datetime_column` for a single default column
- Rows are included when the datetime column value is in `[datetime_after, datetime_before)` (inclusive lower, exclusive upper)
- Applies to SQL/PostgreSQL table ingestion; enables sampling or incremental loads by time window

### Changed
- **Configs use Pydantic**: Schema and all schema-related configs now use Pydantic `BaseModel` (via `ConfigBaseModel`) instead of dataclasses
- `Schema`, `SchemaMetadata`, `VertexConfig`, `Vertex`, `EdgeConfig`, `Edge`, `Resource`, `WeightConfig`, `Field`, and actor configs are Pydantic models
- Validation, YAML/dict loading via `model_validate()` / `from_dict()` / `from_yaml()`, and consistent serialization
- Backward compatible: `resources` accepts empty dict as empty list; field/weight inputs accept strings, `Field` objects, or dicts

## [1.4.5] - 2026-02-02

### Added
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/example-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ from graflo.hq.caster import IngestionParams
caster = Caster(schema)

ingestion_params = IngestionParams(
recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists)
clear_data=True, # Clear existing data before ingesting
# max_items=1000, # Optional: limit number of items to process
)

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/example-2.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ patterns.add_file_pattern(
from graflo.hq.caster import IngestionParams

ingestion_params = IngestionParams(
recreate_schema=True, # Wipe existing schema before defining and ingesting
clear_data=True, # Clear existing data before ingesting
)

caster.ingest(
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/example-3.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ from graflo.hq.caster import IngestionParams
caster = Caster(schema)

ingestion_params = IngestionParams(
recreate_schema=True, # Wipe existing schema before defining and ingesting
clear_data=True, # Clear existing data before ingesting
)

caster.ingest(
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/example-4.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ from graflo.hq.caster import IngestionParams
caster = Caster(schema)

ingestion_params = IngestionParams(
recreate_schema=True, # Wipe existing schema before defining and ingesting
clear_data=True, # Clear existing data before ingesting
)

caster.ingest(
Expand Down
38 changes: 36 additions & 2 deletions docs/examples/example-5.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,31 @@ patterns = engine.create_patterns(
)
```

**Optional: datetime columns for date-range filtering**

To restrict ingestion to a time window, pass `datetime_columns`: a mapping from resource (table) name to the name of the datetime column used for filtering. Use this together with `IngestionParams(datetime_after=..., datetime_before=...)` in the ingestion step:

```python
# Optional: map each table to its datetime column for date-range filtering
datetime_columns = {
"purchases": "purchase_date",
"users": "created_at",
"products": "created_at",
"follows": "created_at",
}
patterns = engine.create_patterns(
postgres_conf,
schema_name="public",
datetime_columns=datetime_columns,
)
```

This creates `TablePattern` instances for each table, which:

- Map table names to resource names (e.g., `users` table → `users` resource)
- Store PostgreSQL connection configuration
- Enable the Caster to query data directly from PostgreSQL using SQL
- Optionally store a `date_field` for date-range filtering when `datetime_columns` is provided

**How Patterns Work:**

Expand All @@ -366,14 +386,28 @@ Finally, ingest the data from PostgreSQL into your target graph database. This i

5. **Graph Database Storage**: Data is written to the target graph database (ArangoDB/Neo4j/TigerGraph) using database-specific APIs for optimal performance. The system handles duplicates and updates based on indexes.

**Restricting ingestion by date range**

You can limit which rows are ingested by providing a date range in `IngestionParams`. Use `datetime_after` and `datetime_before` (ISO-format strings); only rows whose datetime column value falls in `[datetime_after, datetime_before)` are included. This requires either:

- Passing `datetime_columns` when creating patterns (see Step 5), or
- Setting `datetime_column` in `IngestionParams` as a single default column for all resources.

Example:

```python
from graflo.hq import GraphEngine
from graflo.hq.caster import IngestionParams

# Use GraphEngine for schema definition and ingestion
engine = GraphEngine()
ingestion_params = IngestionParams(
recreate_schema=True, # Drop existing schema and define new one before ingesting
clear_data=True, # Clear existing data before ingesting
# Optional: ingest only rows in this date range (requires datetime_columns in create_patterns
# or datetime_column below)
# datetime_after="2020-01-01",
# datetime_before="2021-01-01",
# datetime_column="created_at", # default column when a pattern has no date_field
)

engine.define_and_ingest(
Expand Down Expand Up @@ -436,7 +470,7 @@ patterns = engine.create_patterns(postgres_conf, schema_name="public")

# Step 7: Define schema and ingest data
ingestion_params = IngestionParams(
recreate_schema=True, # Drop existing schema and define new one before ingesting
clear_data=True, # Clear existing data before ingesting
)

# Use GraphEngine to define schema and ingest data
Expand Down
5 changes: 4 additions & 1 deletion docs/examples/example-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ config_data = FileHandle.load("db.yaml")
conn_conf = DBConfig.from_dict(config_data)

ingestion_params = IngestionParams(
clear_data=True,
batch_size=1000, # Process 1000 items per batch
)

Expand Down Expand Up @@ -218,7 +219,9 @@ registry.register(file_source, resource_name="users")
# Both will be processed and combined
from graflo.hq.caster import IngestionParams

ingestion_params = IngestionParams() # Use default parameters
ingestion_params = IngestionParams(
clear_data=True,
)

caster.ingest_data_sources(
data_source_registry=registry,
Expand Down
Loading