Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Resources are your data sources that can be:
- Infer edge configurations from foreign key relationships
- Create Resource mappings from PostgreSQL tables automatically
- Direct database access - ingest data without exporting to files first
- **Async ingestion**: Efficient async/await-based ingestion pipeline for better performance
- **Parallel processing**: Use as many cores as you have
- **Database support**: Ingest into ArangoDB, Neo4j, **TigerGraph**, **FalkorDB**, and **Memgraph** using the same API (database agnostic). Source data from PostgreSQL and other SQL databases.
- **Server-side filtering**: Efficient querying with server-side filtering support (TigerGraph REST++ API)
Expand Down Expand Up @@ -123,46 +124,65 @@ patterns.add_file_pattern(
schema.fetch_resource()

from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine

caster = Caster(schema)

# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
clean_start=False, # Set to True to wipe existing database
# max_items=1000, # Optional: limit number of items to process
# batch_size=10000, # Optional: customize batch size
)

caster.ingest(
engine.define_and_ingest(
schema=schema,
output_config=conn_conf, # Target database config
patterns=patterns, # Source data patterns
ingestion_params=ingestion_params,
clean_start=False, # Set to True to wipe existing database
)

# Option 2: Use Caster directly (schema must be defined separately)
# from graflo.hq import GraphEngine
# engine = GraphEngine()
# engine.define_schema(schema=schema, output_config=conn_conf, clean_start=False)
#
# caster = Caster(schema)
# caster.ingest(
# output_config=conn_conf,
# patterns=patterns,
# ingestion_params=ingestion_params,
# )
```

### PostgreSQL Schema Inference

```python
from graflo.db.postgres import PostgresConnection
from graflo.hq import GraphEngine
from graflo.db.connection.onto import PostgresConfig
from graflo.db.connection.onto import PostgresConfig, ArangoConfig
from graflo import Caster
from graflo.onto import DBFlavor
from graflo.onto import DBType

# Connect to PostgreSQL
postgres_config = PostgresConfig.from_docker_env() # or PostgresConfig.from_env()
postgres_conn = PostgresConnection(postgres_config)

# Create GraphEngine and infer schema from PostgreSQL 3NF database
engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO)
# Connection is automatically managed inside infer_schema()
engine = GraphEngine(target_db_flavor=DBType.ARANGO)
schema = engine.infer_schema(
postgres_conn,
postgres_config,
schema_name="public", # PostgreSQL schema name
)

# Close PostgreSQL connection
postgres_conn.close()
# Define schema in target database (optional, can also use define_and_ingest)
target_config = ArangoConfig.from_docker_env()
engine.define_schema(
schema=schema,
output_config=target_config,
clean_start=False,
)

# Use the inferred schema with Caster
# Use the inferred schema with Caster for ingestion
caster = Caster(schema)
# ... continue with ingestion
```
Expand Down
56 changes: 18 additions & 38 deletions docs/examples/example-5.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,26 +264,20 @@ Automatically generate a graflo Schema from your PostgreSQL database. This is th
```python

from graflo.hq import GraphEngine
from graflo.onto import DBFlavor
from graflo.onto import DBType
from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfig, FalkordbConfig, PostgresConfig
from graflo.db import DBType

# Connect to target graph database to determine flavor
# Connect to target graph database to determine database type
# Choose one of: ArangoConfig, Neo4jConfig, TigergraphConfig, or FalkordbConfig
target_config = ArangoConfig.from_docker_env() # or Neo4jConfig, TigergraphConfig, FalkordbConfig

# Determine db_flavor from target config
# Get database type from target config
db_type = target_config.connection_type
db_flavor = (
DBFlavor(db_type.value)
if db_type in (DBType.ARANGO, DBType.NEO4J, DBType.TIGERGRAPH, DBType.FALKORDB)
else DBFlavor.ARANGO
)

# Create GraphEngine and infer schema automatically
# Connection is automatically managed inside infer_schema()
postgres_conf = PostgresConfig.from_docker_env()
engine = GraphEngine(target_db_flavor=db_flavor)
engine = GraphEngine(target_db_flavor=db_type)
schema = engine.infer_schema(
postgres_conf,
schema_name="public", # PostgreSQL schema name
Expand Down Expand Up @@ -373,26 +367,22 @@ Finally, ingest the data from PostgreSQL into your target graph database. This i
5. **Graph Database Storage**: Data is written to the target graph database (ArangoDB/Neo4j/TigerGraph) using database-specific APIs for optimal performance. The system handles duplicates and updates based on indexes.

```python
from graflo import Caster

# Create Caster with inferred schema
caster = Caster(schema)

# Ingest data from PostgreSQL into graph database
from graflo.hq import GraphEngine
from graflo.hq.caster import IngestionParams

# Use GraphEngine for schema definition and ingestion
engine = GraphEngine()
ingestion_params = IngestionParams(
clean_start=True, # Clear existing data first
)

caster.ingest(
engine.define_and_ingest(
schema=schema,
output_config=target_config, # Target graph database config
patterns=patterns, # PostgreSQL table patterns
ingestion_params=ingestion_params,
clean_start=True, # Clear existing data first
)

# Cleanup
postgres_conn.close()
```

## Complete Example
Expand All @@ -404,11 +394,10 @@ import logging
from pathlib import Path
import yaml

from graflo import Caster
from graflo.onto import DBFlavor
from graflo.db import DBType
from graflo.onto import DBType
from graflo.hq import GraphEngine
from graflo.db.connection.onto import ArangoConfig, PostgresConfig
from graflo.hq.caster import IngestionParams

logger = logging.getLogger(__name__)

Expand All @@ -427,14 +416,9 @@ target_config = ArangoConfig.from_docker_env() # or Neo4jConfig, TigergraphConf
# Step 4: Infer Schema from PostgreSQL database structure
# Connection is automatically managed inside infer_schema()
db_type = target_config.connection_type
db_flavor = (
DBFlavor(db_type.value)
if db_type in (DBType.ARANGO, DBType.NEO4J, DBType.TIGERGRAPH)
else DBFlavor.ARANGO
)

# Create GraphEngine and infer schema
engine = GraphEngine(target_db_flavor=db_flavor)
engine = GraphEngine(target_db_flavor=db_type)
schema = engine.infer_schema(
postgres_conf,
schema_name="public",
Expand All @@ -450,24 +434,20 @@ logger.info(f"Inferred schema saved to {schema_output_file}")
engine = GraphEngine()
patterns = engine.create_patterns(postgres_conf, schema_name="public")

# Step 7: Create Caster and ingest data
from graflo.hq.caster import IngestionParams

caster = Caster(schema)

# Step 7: Define schema and ingest data
ingestion_params = IngestionParams(
clean_start=True, # Clear existing data first
)

caster.ingest(
# Use GraphEngine to define schema and ingest data
engine.define_and_ingest(
schema=schema,
output_config=target_config,
patterns=patterns,
ingestion_params=ingestion_params,
clean_start=True, # Clear existing data first
)

# Cleanup
postgres_conn.close()

print("\n" + "=" * 80)
print("Ingestion complete!")
print("=" * 80)
Expand Down
43 changes: 34 additions & 9 deletions docs/getting_started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This guide will help you get started with graflo by showing you how to transform
- `DataSource` defines where data comes from (files, APIs, SQL databases, in-memory objects).
- Class `Patterns` manages the mapping of resources to their physical data sources (files or PostgreSQL tables). It efficiently handles PostgreSQL connections by grouping tables that share the same connection configuration.
- `DataSourceRegistry` maps DataSources to Resources (many DataSources can map to the same Resource).
1- Database backend configurations use Pydantic `BaseSettings` with environment variable support. Use `ArangoConfig`, `Neo4jConfig`, `TigergraphConfig`, `FalkordbConfig`, `MemgraphConfig`, or `PostgresConfig` directly, or load from docker `.env` files using `from_docker_env()`. All configs inherit from `DBConfig` and support unified `database`/`schema_name` structure with `effective_database` and `effective_schema` properties for database-agnostic access. If `effective_schema` is not set, `Caster` automatically uses `Schema.general.name` as fallback.
1- Database backend configurations use Pydantic `BaseSettings` with environment variable support. Use `ArangoConfig`, `Neo4jConfig`, `TigergraphConfig`, `FalkordbConfig`, `MemgraphConfig`, or `PostgresConfig` directly, or load from docker `.env` files using `from_docker_env()`. All configs inherit from `DBConfig` and support unified `database`/`schema_name` structure with `effective_database` and `effective_schema` properties for database-agnostic access. If `effective_schema` is not set, `GraphEngine.define_schema()` automatically uses `Schema.general.name` as fallback.

## Basic Example

Expand Down Expand Up @@ -77,16 +77,32 @@ patterns = Patterns(
)

from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine

# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
clean_start=False, # Set to True to wipe existing database
)

caster.ingest(
engine.define_and_ingest(
schema=schema,
output_config=conn_conf, # Target database config
patterns=patterns, # Source data patterns
ingestion_params=ingestion_params,
clean_start=False, # Set to True to wipe existing database
)

# Option 2: Use Caster directly (schema must be defined separately)
# engine = GraphEngine()
# engine.define_schema(schema=schema, output_config=conn_conf, clean_start=False)
#
# caster = Caster(schema)
# caster.ingest(
# output_config=conn_conf,
# patterns=patterns,
# ingestion_params=ingestion_params,
# )
```

Here `schema` defines the graph and the mapping the sources to vertices and edges (refer to [Schema](../concepts/index.md#schema) for details on schema and its components).
Expand Down Expand Up @@ -138,23 +154,23 @@ patterns = Patterns(

# Ingest
from graflo.db.connection.onto import ArangoConfig
from graflo.hq import GraphEngine

arango_config = ArangoConfig.from_docker_env() # Target graph database
caster = Caster(schema)

from graflo.hq.caster import IngestionParams

# Use GraphEngine for schema definition and ingestion
engine = GraphEngine()
ingestion_params = IngestionParams(
clean_start=False, # Set to True to wipe existing database
)

caster.ingest(
engine.define_and_ingest(
schema=schema,
output_config=arango_config, # Target graph database
patterns=patterns, # Source PostgreSQL tables
ingestion_params=ingestion_params,
clean_start=False, # Set to True to wipe existing database
)

pg_conn.close()
```

## Using API Data Sources
Expand Down Expand Up @@ -189,9 +205,18 @@ registry.register(api_source, resource_name="users")

# Ingest
from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine

caster = Caster(schema)
# Define schema first (required before ingestion)
engine = GraphEngine()
engine.define_schema(
schema=schema,
output_config=conn_conf,
clean_start=False,
)

# Then ingest using Caster
caster = Caster(schema)
ingestion_params = IngestionParams() # Use default parameters

caster.ingest_data_sources(
Expand Down
8 changes: 8 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ Resources define how data is transformed into a graph (semantic mapping). They w
- **Table-like processing**: CSV files, SQL tables, API responses
- **JSON-like processing**: JSON files, nested data structures, hierarchical API responses

### GraphEngine
The `GraphEngine` orchestrates graph database operations, providing a unified interface for:
- Schema inference from PostgreSQL databases
- Schema definition in target graph databases (moved from Caster)
- Pattern creation from data sources
- Data ingestion with async support

## Key Features

- **🚀 PostgreSQL Schema Inference**: **Automatically generate schemas from normalized PostgreSQL databases (3NF)** - No manual schema definition needed!
Expand All @@ -71,6 +78,7 @@ Resources define how data is transformed into a graph (semantic mapping). They w
- Vertex fields support types (INT, FLOAT, STRING, DATETIME, BOOL) for better validation
- Edge weight fields can specify types for improved type safety
- Backward compatible: fields without types default to None (suitable for databases like ArangoDB)
- **Async Ingestion**: Efficient async/await-based ingestion pipeline for better performance
- **Parallel Processing**: Efficient processing with multi-threading
- **Database Integration**: Seamless integration with Neo4j, ArangoDB, TigerGraph, FalkorDB, Memgraph, and PostgreSQL (as source)
- **Advanced Filtering**: Powerful filtering capabilities for data transformation with server-side filtering support
Expand Down
18 changes: 12 additions & 6 deletions examples/1-ingest-csv/ingest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pathlib
from suthing import FileHandle
from graflo import Caster, Patterns, Schema
from graflo import Patterns, Schema
from graflo.util.onto import FilePattern
from graflo.db.connection.onto import ArangoConfig
from graflo.hq import GraphEngine
from graflo.hq.caster import IngestionParams

schema = Schema.from_dict(FileHandle.load("schema.yaml"))
Expand All @@ -22,6 +23,9 @@
# database="_system",
# )

# Determine DB type from connection config
db_type = conn_conf.connection_type

# Create Patterns with file patterns
patterns = Patterns()
patterns.add_file_pattern(
Expand All @@ -45,10 +49,12 @@
# }
# )

caster = Caster(schema)


# Create GraphEngine and define schema + ingest in one operation
engine = GraphEngine(target_db_flavor=db_type)
ingestion_params = IngestionParams(clean_start=True)
caster.ingest(
output_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params
engine.define_and_ingest(
schema=schema,
output_config=conn_conf,
patterns=patterns,
ingestion_params=ingestion_params,
)
Loading