diff --git a/CHANGELOG.md b/CHANGELOG.md index daf862c1..3716805c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,30 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.3] - 2026-01-25 + +### Added +- **SchemaSanitizer for TigerGraph**: Added comprehensive schema sanitization for TigerGraph compatibility + - `SchemaSanitizer` class in `graflo.hq.sanitizer` module for sanitizing schema attributes + - Sanitizes vertex names and field names to avoid reserved words (appends `_vertex` suffix for vertex names, `_attr` for attributes) + - Sanitizes edge relation names to avoid reserved words and collisions with vertex names (appends `_relation` suffix) + - Normalizes vertex indexes for TigerGraph: ensures edges with the same relation have consistent source and target indexes + - Automatically applies field index mappings to resources when indexes are normalized + - Handles field name transformations in TransformActor instances to maintain data consistency +- **Vertex `dbname` field**: Added `dbname` field to `Vertex` class for database-specific vertex name mapping + - Allows specifying a different database name than the logical vertex name + - Used by SchemaSanitizer to store sanitized vertex names for TigerGraph compatibility +- **Edge `relation_dbname` property**: Added `relation_dbname` property to `Edge` class for database-specific relation name mapping + - Returns sanitized relation name if set, otherwise falls back to `relation` field + - Used by SchemaSanitizer to store sanitized relation names for TigerGraph compatibility + - Supports setter for updating the database-specific relation name +- **GraphEngine orchestrator**: Added `GraphEngine` class as the main orchestrator for graph database operations + - Coordinates schema inference, pattern creation, and data ingestion workflows + - Provides unified interface: `infer_schema()`, `create_patterns()`, and `ingest()` methods + - Integrates `InferenceManager`, `ResourceMapper`, and `Caster` components + - Supports target database flavor configuration for schema sanitization + - Located in `graflo.hq.graph_engine` module + ## [1.4.0] - 2026-01-15 ### Removed diff --git a/README.md b/README.md index eb9830a3..c37503ef 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ patterns.add_file_pattern( schema.fetch_resource() -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) @@ -143,7 +143,7 @@ caster.ingest( ```python from graflo.db.postgres import PostgresConnection -from graflo.db.inferencer import infer_schema_from_postgres +from graflo.hq import GraphEngine from graflo.db.connection.onto import PostgresConfig from graflo import Caster from graflo.onto import DBFlavor @@ -152,11 +152,11 @@ from graflo.onto import DBFlavor postgres_config = PostgresConfig.from_docker_env() # or PostgresConfig.from_env() postgres_conn = PostgresConnection(postgres_config) -# Infer schema from PostgreSQL 3NF database -schema = infer_schema_from_postgres( +# Create GraphEngine and infer schema from PostgreSQL 3NF database +engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO) +schema = engine.infer_schema( postgres_conn, schema_name="public", # PostgreSQL schema name - db_flavor=DBFlavor.ARANGO # Target graph database flavor ) # Close PostgreSQL connection diff --git a/docs/examples/example-1.md b/docs/examples/example-1.md index da8e1989..12d8c886 100644 --- a/docs/examples/example-1.md +++ b/docs/examples/example-1.md @@ -116,7 +116,7 @@ patterns.add_file_pattern( # } # ) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) diff --git a/docs/examples/example-2.md b/docs/examples/example-2.md index 97f12633..9e648964 100644 --- a/docs/examples/example-2.md +++ b/docs/examples/example-2.md @@ -129,7 +129,7 @@ patterns.add_file_pattern( FilePattern(regex="\Sjson$", sub_path=pathlib.Path("."), resource_name="work") ) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( clean_start=True, # Wipe existing database before ingestion diff --git a/docs/examples/example-3.md b/docs/examples/example-3.md index a9ea244b..644fb5fb 100644 --- a/docs/examples/example-3.md +++ b/docs/examples/example-3.md @@ -115,7 +115,7 @@ patterns.add_file_pattern( FilePattern(regex="^relations.*\.csv$", sub_path=pathlib.Path("."), resource_name="people") ) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) diff --git a/docs/examples/example-4.md b/docs/examples/example-4.md index abac146a..f2f5eeac 100644 --- a/docs/examples/example-4.md +++ b/docs/examples/example-4.md @@ -209,7 +209,7 @@ patterns.add_file_pattern( FilePattern(regex=r"^bugs.*\.json(?:\.gz)?$", sub_path=pathlib.Path("./data"), resource_name="bug") ) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) diff --git a/docs/examples/example-5.md b/docs/examples/example-5.md index 81b6ab43..a0b1ee86 100644 --- a/docs/examples/example-5.md +++ b/docs/examples/example-5.md @@ -64,7 +64,7 @@ The example uses a PostgreSQL database with a typical 3NF (Third Normal Form) sc ## Automatic Schema Inference -The `infer_schema_from_postgres()` function automatically analyzes your PostgreSQL database and creates a complete graflo Schema. This process involves several sophisticated steps: +The `GraphEngine.infer_schema_from_postgres()` method automatically analyzes your PostgreSQL database and creates a complete graflo Schema. This process involves several sophisticated steps: ### How Schema Inference Works @@ -249,7 +249,7 @@ Make sure the corresponding database container is running before starting ingest Automatically generate a graflo Schema from your PostgreSQL database. This is the core of the automatic inference process: -**What `infer_schema_from_postgres()` does:** +**What `GraphEngine.infer_schema_from_postgres()` does:** 1. **Queries PostgreSQL Information Schema**: The function queries PostgreSQL's information schema to discover all tables in the specified schema. It retrieves column information (names, types, constraints), identifies primary keys and foreign keys, and understands table relationships. @@ -263,7 +263,7 @@ Automatically generate a graflo Schema from your PostgreSQL database. This is th ```python -from graflo.db.inferencer import infer_schema_from_postgres +from graflo.hq import GraphEngine from graflo.onto import DBFlavor from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfig, FalkordbConfig from graflo.db import DBType @@ -280,11 +280,11 @@ db_flavor = ( else DBFlavor.ARANGO ) -# Infer schema automatically -schema = infer_schema_from_postgres( +# Create GraphEngine and infer schema automatically +engine = GraphEngine(target_db_flavor=db_flavor) +schema = engine.infer_schema( postgres_conn, schema_name="public", # PostgreSQL schema name - db_flavor=db_flavor # Target graph database flavor ) ``` @@ -329,11 +329,14 @@ Create `Patterns` that map PostgreSQL tables to resources: ```python -from graflo.db.inferencer import create_patterns_from_postgres +from graflo.hq import GraphEngine + +# Create GraphEngine instance +engine = GraphEngine() # Create patterns from PostgreSQL tables -patterns = create_patterns_from_postgres( - postgres_conn, +patterns = engine.create_patterns( + postgres_conf, schema_name="public" ) ``` @@ -374,7 +377,7 @@ from graflo import Caster caster = Caster(schema) # Ingest data from PostgreSQL into graph database -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( clean_start=True, # Clear existing data first @@ -382,7 +385,7 @@ ingestion_params = IngestionParams( caster.ingest( output_config=target_config, # Target graph database config - patterns=patterns, # PostgreSQL table patterns + patterns=patterns, # PostgreSQL table patterns ingestion_params=ingestion_params, ) @@ -405,7 +408,7 @@ from graflo.db import DBType from graflo.db.postgres import ( PostgresConnection, ) -from graflo.db.inferencer import infer_schema_from_postgres, create_patterns_from_postgres +from graflo.hq import GraphEngine from graflo.db.connection.onto import ArangoConfig, PostgresConfig logger = logging.getLogger(__name__) @@ -431,10 +434,11 @@ db_flavor = ( else DBFlavor.ARANGO ) -schema = infer_schema_from_postgres( +# Create GraphEngine and infer schema +engine = GraphEngine(target_db_flavor=db_flavor) +schema = engine.infer_schema( postgres_conn, schema_name="public", - db_flavor=db_flavor ) # Step 5: Save inferred schema to YAML (optional) @@ -444,10 +448,11 @@ with open(schema_output_file, "w") as f: logger.info(f"Inferred schema saved to {schema_output_file}") # Step 6: Create Patterns from PostgreSQL tables -patterns = create_patterns_from_postgres(postgres_conn, schema_name="public") +engine = GraphEngine() +patterns = engine.create_patterns(postgres_conf, schema_name="public") # Step 7: Create Caster and ingest data -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) @@ -709,8 +714,9 @@ This pattern is particularly useful for: After inference, you can modify the schema: ```python -# Infer schema -schema = infer_schema_from_postgres(postgres_conn, schema_name="public") +# Create GraphEngine and infer schema +engine = GraphEngine() +schema = engine.infer_schema(postgres_conn, schema_name="public") # Modify schema as needed # Add custom transforms, filters, or additional edges diff --git a/docs/examples/example-6.md b/docs/examples/example-6.md index c8915e07..debd13d0 100644 --- a/docs/examples/example-6.md +++ b/docs/examples/example-6.md @@ -83,7 +83,7 @@ registry = DataSourceRegistry() registry.register(api_source, resource_name="users") # Create caster and ingest -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) # Load config from file @@ -216,7 +216,7 @@ file_source = DataSourceFactory.create_file_data_source(path="users_backup.json" registry.register(file_source, resource_name="users") # Both will be processed and combined -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams() # Use default parameters diff --git a/docs/getting_started/quickstart.md b/docs/getting_started/quickstart.md index 36f3d812..8c67ae74 100644 --- a/docs/getting_started/quickstart.md +++ b/docs/getting_started/quickstart.md @@ -76,7 +76,7 @@ patterns = Patterns( } ) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( clean_start=False, # Set to True to wipe existing database @@ -109,18 +109,20 @@ You can ingest data directly from PostgreSQL tables. First, infer the schema fro ```python from graflo.db.postgres import PostgresConnection -from graflo.db.inferencer import infer_schema_from_postgres, create_patterns_from_postgres +from graflo.hq import GraphEngine from graflo.db.connection.onto import PostgresConfig # Connect to PostgreSQL pg_config = PostgresConfig.from_docker_env() # Or from_env(), or create directly pg_conn = PostgresConnection(pg_config) -# Infer schema from PostgreSQL (automatically detects vertices and edges) -schema = infer_schema_from_postgres(pg_conn, schema_name="public") +# Create GraphEngine and infer schema from PostgreSQL (automatically detects vertices and edges) +engine = GraphEngine() +schema = engine.infer_schema(pg_conn, schema_name="public") # Create patterns from PostgreSQL tables -patterns = create_patterns_from_postgres(pg_conn, schema_name="public") +engine = GraphEngine() +patterns = engine.create_patterns(pg_config, schema_name="public") # Or create patterns manually from graflo.util.onto import Patterns, TablePattern @@ -141,7 +143,7 @@ from graflo.db.connection.onto import ArangoConfig arango_config = ArangoConfig.from_docker_env() # Target graph database caster = Caster(schema) -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( clean_start=False, # Set to True to wipe existing database @@ -187,7 +189,7 @@ registry = DataSourceRegistry() registry.register(api_source, resource_name="users") # Ingest -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams caster = Caster(schema) diff --git a/docs/reference/architecture/onto_sql.md b/docs/reference/architecture/onto_sql.md new file mode 100644 index 00000000..be6a4a94 --- /dev/null +++ b/docs/reference/architecture/onto_sql.md @@ -0,0 +1,3 @@ +# `graflo.architecture.onto_sql` + +::: graflo.architecture.onto_sql diff --git a/docs/reference/caster.md b/docs/reference/caster.md deleted file mode 100644 index 13c3a46e..00000000 --- a/docs/reference/caster.md +++ /dev/null @@ -1,3 +0,0 @@ -# `graflo.caster` - -::: graflo.caster diff --git a/docs/reference/data_source/index.md b/docs/reference/data_source/index.md index a80f0d03..1037c458 100644 --- a/docs/reference/data_source/index.md +++ b/docs/reference/data_source/index.md @@ -160,7 +160,7 @@ source = DataSourceFactory.create_sql_data_source(config) ```python from graflo import Caster, DataSourceRegistry -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams registry = DataSourceRegistry() registry.register(file_source, resource_name="users") diff --git a/docs/reference/db/memgraph/__init__.md b/docs/reference/db/memgraph/__init__.md index 1c2e3e69..44c2c411 100644 --- a/docs/reference/db/memgraph/__init__.md +++ b/docs/reference/db/memgraph/__init__.md @@ -1,4 +1,3 @@ # `graflo.db.memgraph` ::: graflo.db.memgraph - diff --git a/docs/reference/db/memgraph/conn.md b/docs/reference/db/memgraph/conn.md index 8ca7893e..43beb4ec 100644 --- a/docs/reference/db/memgraph/conn.md +++ b/docs/reference/db/memgraph/conn.md @@ -1,4 +1,3 @@ # `graflo.db.memgraph.conn` ::: graflo.db.memgraph.conn - diff --git a/docs/reference/db/postgres/fuzzy_matcher.md b/docs/reference/db/postgres/fuzzy_matcher.md new file mode 100644 index 00000000..d373cb8f --- /dev/null +++ b/docs/reference/db/postgres/fuzzy_matcher.md @@ -0,0 +1,3 @@ +# `graflo.db.postgres.fuzzy_matcher` + +::: graflo.db.postgres.fuzzy_matcher diff --git a/docs/reference/db/postgres/heuristics.md b/docs/reference/db/postgres/heuristics.md new file mode 100644 index 00000000..29fedea2 --- /dev/null +++ b/docs/reference/db/postgres/heuristics.md @@ -0,0 +1,3 @@ +# `graflo.db.postgres.heuristics` + +::: graflo.db.postgres.heuristics diff --git a/docs/reference/db/postgres/inference_utils.md b/docs/reference/db/postgres/inference_utils.md new file mode 100644 index 00000000..8b31de6f --- /dev/null +++ b/docs/reference/db/postgres/inference_utils.md @@ -0,0 +1,3 @@ +# `graflo.db.postgres.inference_utils` + +::: graflo.db.postgres.inference_utils diff --git a/docs/reference/db/postgres/util.md b/docs/reference/db/postgres/util.md new file mode 100644 index 00000000..40707408 --- /dev/null +++ b/docs/reference/db/postgres/util.md @@ -0,0 +1,3 @@ +# `graflo.db.postgres.util` + +::: graflo.db.postgres.util diff --git a/docs/reference/db/tigergraph/conn.md b/docs/reference/db/tigergraph/conn.md index 8316fa78..eb5c631b 100644 --- a/docs/reference/db/tigergraph/conn.md +++ b/docs/reference/db/tigergraph/conn.md @@ -1,92 +1,3 @@ # `graflo.db.tigergraph.conn` -TigerGraph connection implementation for graph database operations. - ::: graflo.db.tigergraph.conn - -## Overview - -The `TigerGraphConnection` class provides a robust implementation for TigerGraph database operations, including schema definition, data ingestion, and query execution. It uses TigerGraph's SCHEMA_CHANGE job system for reliable schema management and REST++ API for efficient batch operations. - -## Key Features - -### Schema Definition - -- **SCHEMA_CHANGE Job Approach**: Uses TigerGraph's SCHEMA_CHANGE jobs for local schema definition within graphs - - More reliable than global vertex/edge creation - - Better integration with TigerGraph's graph-scoped schema model - - Automatic schema verification after creation - -- **Automatic Edge Discriminator Handling**: Automatically handles edge discriminators for multiple edges of the same type between the same vertices - - Automatically adds indexed fields to edge weights when missing (TigerGraph requirement) - - Ensures discriminator fields are also edge attributes - - Supports both explicit indexes and relation_field for backward compatibility - -### Data Ingestion - -- **Robust Batch Operations**: Enhanced batch vertex and edge insertion with automatic fallback - - Failed batch payloads automatically retry with individual upserts - - Preserves original edge data for fallback operations - - Better error recovery and data integrity - -- **Composite Primary Keys**: Supports both single-field PRIMARY_ID and composite PRIMARY KEY syntax - - Single-field indexes use PRIMARY_ID syntax (required by GSQL and GraphStudio) - - Composite keys use PRIMARY KEY syntax (works in GSQL, not GraphStudio UI) - -### Error Handling - -- **Improved Error Detection**: More lenient error detection and better error messages - - Case-insensitive vertex type comparison (handles TigerGraph capitalization) - - Detailed schema verification results in error messages - - Graceful handling of schema change job errors - -## Usage Example - -```python -from graflo import Caster, Schema -from graflo.db.connection.onto import TigergraphConfig - -# Load config from docker environment -config = TigergraphConfig.from_docker_env() - -# Create connection -conn = TigerGraphConnection(config) - -# Initialize database with schema -schema = Schema.from_dict(...) -conn.init_db(schema, clean_start=True) - -# Batch upsert vertices -conn.upsert_docs_batch(docs, "User", match_keys=["email"]) - -# Batch insert edges -conn.insert_edges_batch( - edges_data, - source_class="User", - target_class="Company", - relation_name="works_at", - match_keys_source=("email",), - match_keys_target=("name",) -) -``` - -## Schema Definition Details - -### Vertex Types - -Vertices are created using `ADD VERTEX` statements in SCHEMA_CHANGE jobs: - -- **Single-field primary key**: Uses `PRIMARY_ID` syntax with `PRIMARY_ID_AS_ATTRIBUTE="true"` for REST++ API compatibility -- **Composite primary key**: Uses `PRIMARY KEY` syntax (note: GraphStudio UI doesn't support composite keys) - -### Edge Types - -Edges are created using `ADD DIRECTED EDGE` statements with automatic discriminator handling: - -- **Discriminators**: Automatically added for all indexed fields to support multiple edges of the same type between the same vertices -- **Edge Attributes**: Discriminator fields are automatically added to edge weights if missing -- **Format**: `DISCRIMINATOR(field1 TYPE1, field2 TYPE2)` clause included in edge definition - -## API Reference - -See the auto-generated API documentation below for complete method signatures and parameters. diff --git a/docs/reference/db/tigergraph/onto.md b/docs/reference/db/tigergraph/onto.md new file mode 100644 index 00000000..f1c441ad --- /dev/null +++ b/docs/reference/db/tigergraph/onto.md @@ -0,0 +1,3 @@ +# `graflo.db.tigergraph.onto` + +::: graflo.db.tigergraph.onto diff --git a/docs/reference/hq/__init__.md b/docs/reference/hq/__init__.md new file mode 100644 index 00000000..1cb3a1d3 --- /dev/null +++ b/docs/reference/hq/__init__.md @@ -0,0 +1,3 @@ +# `graflo.hq` + +::: graflo.hq diff --git a/docs/reference/hq/caster.md b/docs/reference/hq/caster.md new file mode 100644 index 00000000..1c3199fa --- /dev/null +++ b/docs/reference/hq/caster.md @@ -0,0 +1,3 @@ +# `graflo.hq.caster` + +::: graflo.hq.caster diff --git a/docs/reference/hq/graph_engine.md b/docs/reference/hq/graph_engine.md new file mode 100644 index 00000000..1c0c8413 --- /dev/null +++ b/docs/reference/hq/graph_engine.md @@ -0,0 +1,3 @@ +# `graflo.hq.graph_engine` + +::: graflo.hq.graph_engine diff --git a/docs/reference/hq/inferencer.md b/docs/reference/hq/inferencer.md new file mode 100644 index 00000000..4a5d8266 --- /dev/null +++ b/docs/reference/hq/inferencer.md @@ -0,0 +1,3 @@ +# `graflo.hq.inferencer` + +::: graflo.hq.inferencer diff --git a/docs/reference/hq/resource_mapper.md b/docs/reference/hq/resource_mapper.md new file mode 100644 index 00000000..4e1a8ca4 --- /dev/null +++ b/docs/reference/hq/resource_mapper.md @@ -0,0 +1,3 @@ +# `graflo.hq.resource_mapper` + +::: graflo.hq.resource_mapper diff --git a/docs/reference/hq/sanitizer.md b/docs/reference/hq/sanitizer.md new file mode 100644 index 00000000..5cb1b911 --- /dev/null +++ b/docs/reference/hq/sanitizer.md @@ -0,0 +1,3 @@ +# `graflo.hq.sanitizer` + +::: graflo.hq.sanitizer diff --git a/examples/1-ingest-csv/ingest.py b/examples/1-ingest-csv/ingest.py index 2982883f..9d592c36 100644 --- a/examples/1-ingest-csv/ingest.py +++ b/examples/1-ingest-csv/ingest.py @@ -3,7 +3,7 @@ from graflo import Caster, Patterns, Schema from graflo.util.onto import FilePattern from graflo.db.connection.onto import ArangoConfig -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams schema = Schema.from_dict(FileHandle.load("schema.yaml")) diff --git a/examples/2-ingest-self-references/ingest.py b/examples/2-ingest-self-references/ingest.py index eb0670e5..aeb6b112 100644 --- a/examples/2-ingest-self-references/ingest.py +++ b/examples/2-ingest-self-references/ingest.py @@ -3,7 +3,7 @@ from graflo import Caster, Patterns, Schema from graflo.util.onto import FilePattern from graflo.db.connection.onto import ArangoConfig -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams schema = Schema.from_dict(FileHandle.load("schema.yaml")) diff --git a/examples/3-ingest-csv-edge-weights/ingest.py b/examples/3-ingest-csv-edge-weights/ingest.py index 33f28e66..7eb0f254 100644 --- a/examples/3-ingest-csv-edge-weights/ingest.py +++ b/examples/3-ingest-csv-edge-weights/ingest.py @@ -1,7 +1,7 @@ from suthing import FileHandle from graflo import Caster, Patterns, Schema from graflo.db.connection.onto import Neo4jConfig -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams import logging diff --git a/examples/4-ingest-neo4j/ingest.py b/examples/4-ingest-neo4j/ingest.py index a5caeeed..17ca42a0 100644 --- a/examples/4-ingest-neo4j/ingest.py +++ b/examples/4-ingest-neo4j/ingest.py @@ -3,7 +3,7 @@ from graflo import Caster, Patterns, Schema from graflo.util.onto import FilePattern from graflo.db.connection.onto import Neo4jConfig -from graflo.caster import IngestionParams +from graflo.hq.caster import IngestionParams schema = Schema.from_dict(FileHandle.load("schema.yaml")) diff --git a/examples/5-ingest-postgres/ingest.py b/examples/5-ingest-postgres/ingest.py index 28bebc11..2237357f 100644 --- a/examples/5-ingest-postgres/ingest.py +++ b/examples/5-ingest-postgres/ingest.py @@ -17,17 +17,12 @@ from graflo.onto import DBFlavor from graflo.db import DBType -from graflo import Caster from graflo.db.postgres import ( PostgresConnection, ) -from graflo.db.inferencer import ( - infer_schema_from_postgres, - create_patterns_from_postgres, -) +from graflo.hq import GraphEngine, IngestionParams from graflo.db.postgres.util import load_schema_from_sql_file from graflo.db.connection.onto import PostgresConfig, TigergraphConfig -from graflo.caster import IngestionParams logger = logging.getLogger(__name__) @@ -95,16 +90,19 @@ logger.warning(f"Mock schema file not found: {schema_file}") logger.warning("Assuming PostgreSQL database is already initialized") -# Step 3: Infer Schema from PostgreSQL database structure +# Step 3: Create GraphEngine to orchestrate schema inference, pattern creation, and ingestion +# GraphEngine coordinates all operations: schema inference, pattern mapping, and data ingestion +engine = GraphEngine( + target_db_flavor=db_flavor, ingestion_params=IngestionParams(clean_start=True) +) + +# Step 3.1: Infer Schema from PostgreSQL database structure # This automatically detects vertex-like and edge-like tables based on: # - Vertex tables: Have a primary key and descriptive columns # - Edge tables: Have 2+ foreign keys (representing relationships) # Connection is automatically closed when exiting the context with PostgresConnection(postgres_conf) as postgres_conn: - schema = infer_schema_from_postgres( - postgres_conn, schema_name="public", db_flavor=db_flavor - ) - + schema = engine.infer_schema(postgres_conn, schema_name="public") schema.general.name = "accounting" # Step 3.5: Dump inferred schema to YAML file @@ -117,17 +115,15 @@ # Step 4: Create Patterns from PostgreSQL tables # This maps PostgreSQL tables to resource patterns that Caster can use -# Connection is automatically closed when exiting the context -with PostgresConnection(postgres_conf) as postgres_conn: - patterns = create_patterns_from_postgres(postgres_conn, schema_name="public") - -# Step 5: Create Caster and ingest data -# Note: caster.ingest() will create its own PostgreSQL connections per table internally - -caster = Caster(schema) -ingestion_params = IngestionParams(clean_start=True) -caster.ingest( - output_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params +# Connection is automatically managed inside create_patterns() +patterns = engine.create_patterns(postgres_conf, schema_name="public") + +# Step 5: Ingest data using GraphEngine +# Note: ingestion will create its own PostgreSQL connections per table internally +engine.ingest( + schema=schema, + output_config=conn_conf, + patterns=patterns, ) print("\n" + "=" * 80) diff --git a/graflo/__init__.py b/graflo/__init__.py index c6768c0d..c1cc26bf 100644 --- a/graflo/__init__.py +++ b/graflo/__init__.py @@ -20,7 +20,7 @@ """ from .architecture import Index, Schema -from .caster import Caster +from graflo.hq.caster import Caster from .data_source import ( APIConfig, APIDataSource, diff --git a/graflo/cli/ingest.py b/graflo/cli/ingest.py index 2d1ddd48..3f8cad6c 100644 --- a/graflo/cli/ingest.py +++ b/graflo/cli/ingest.py @@ -137,7 +137,7 @@ def ingest( schema.fetch_resource() # Create ingestion params with CLI arguments - from graflo.caster import IngestionParams + from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( n_cores=n_cores, diff --git a/graflo/db/__init__.py b/graflo/db/__init__.py index 9940b337..558bc272 100644 --- a/graflo/db/__init__.py +++ b/graflo/db/__init__.py @@ -32,16 +32,13 @@ from .neo4j.conn import Neo4jConnection from .postgres.conn import PostgresConnection from .tigergraph.conn import TigerGraphConnection -from .inferencer import infer_schema_from_postgres, create_patterns_from_postgres __all__ = [ "Connection", "ConnectionType", - "create_patterns_from_postgres", "DBType", "DBConfig", - "infer_schema_from_postgres", "ConnectionManager", "ArangoConnection", "FalkordbConnection", diff --git a/graflo/db/postgres/__init__.py b/graflo/db/postgres/__init__.py index d5f657dd..5d300e2b 100644 --- a/graflo/db/postgres/__init__.py +++ b/graflo/db/postgres/__init__.py @@ -10,11 +10,13 @@ - PostgresResourceMapper: Maps PostgreSQL tables to graflo Resources Example: - >>> from graflo.db.inferencer import infer_schema_from_postgres >>> >>> from graflo.db.postgres import PostgresConnection + >>> from graflo.hq import GraphEngine + >>> from graflo.db.postgres import PostgresConnection >>> from graflo.db.connection.onto import PostgresConfig >>> config = PostgresConfig.from_docker_env() >>> conn = PostgresConnection(config) - >>> schema = infer_schema_from_postgres(conn, schema_name="public") + >>> engine = GraphEngine() + >>> schema = engine.infer_schema(conn, schema_name="public") >>> conn.close() """ diff --git a/graflo/db/postgres/resource_mapping.py b/graflo/db/postgres/resource_mapping.py index 0363de28..bdd79bce 100644 --- a/graflo/db/postgres/resource_mapping.py +++ b/graflo/db/postgres/resource_mapping.py @@ -8,7 +8,7 @@ from graflo.architecture.resource import Resource from graflo.architecture.vertex import VertexConfig -from ..sanitizer import SchemaSanitizer +from graflo.hq.sanitizer import SchemaSanitizer from .conn import EdgeTableInfo, SchemaIntrospectionResult from .fuzzy_matcher import FuzzyMatchCache from .inference_utils import ( diff --git a/graflo/hq/__init__.py b/graflo/hq/__init__.py new file mode 100644 index 00000000..fc681d4e --- /dev/null +++ b/graflo/hq/__init__.py @@ -0,0 +1,20 @@ +"""High-level orchestration modules for graflo. + +This package provides high-level orchestration classes that coordinate +multiple components for graph database operations. +""" + +from graflo.hq.caster import Caster, IngestionParams +from graflo.hq.graph_engine import GraphEngine +from graflo.hq.inferencer import InferenceManager +from graflo.hq.resource_mapper import ResourceMapper +from graflo.hq.sanitizer import SchemaSanitizer + +__all__ = [ + "Caster", + "GraphEngine", + "IngestionParams", + "InferenceManager", + "ResourceMapper", + "SchemaSanitizer", +] diff --git a/graflo/caster.py b/graflo/hq/caster.py similarity index 100% rename from graflo/caster.py rename to graflo/hq/caster.py diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py new file mode 100644 index 00000000..b717ff26 --- /dev/null +++ b/graflo/hq/graph_engine.py @@ -0,0 +1,111 @@ +"""Graph engine for orchestrating schema inference, pattern creation, and ingestion. + +This module provides the GraphEngine class which serves as the main orchestrator +for graph database operations, coordinating between inference, pattern mapping, +and data ingestion. +""" + +import logging + +from graflo import Schema +from graflo.db import PostgresConnection +from graflo.db.connection.onto import DBConfig, PostgresConfig +from graflo.hq.caster import Caster, IngestionParams +from graflo.hq.inferencer import InferenceManager +from graflo.hq.resource_mapper import ResourceMapper +from graflo.onto import DBFlavor +from graflo.util.onto import Patterns + +logger = logging.getLogger(__name__) + + +class GraphEngine: + """Orchestrator for graph database operations. + + GraphEngine coordinates schema inference, pattern creation, and data ingestion, + providing a unified interface for working with graph databases. + + Attributes: + inferencer: InferenceManager instance for schema inference + caster: Caster instance for data ingestion + resource_mapper: ResourceMapper instance for pattern creation + """ + + def __init__( + self, + target_db_flavor: DBFlavor = DBFlavor.ARANGO, + ingestion_params: IngestionParams | None = None, + ): + """Initialize the GraphEngine. + + Args: + target_db_flavor: Target database flavor for schema sanitization + ingestion_params: IngestionParams instance for controlling ingestion behavior + """ + self.target_db_flavor = target_db_flavor + self.ingestion_params = ingestion_params or IngestionParams() + self.resource_mapper = ResourceMapper() + + def infer_schema( + self, + postgres_conn: PostgresConnection, + schema_name: str | None = None, + ) -> Schema: + """Infer a graflo Schema from PostgreSQL database. + + Args: + postgres_conn: PostgresConnection instance + schema_name: Schema name to introspect (defaults to config schema_name or 'public') + + Returns: + Schema: Inferred schema with vertices, edges, and resources + """ + inferencer = InferenceManager( + conn=postgres_conn, target_db_flavor=self.target_db_flavor + ) + return inferencer.infer_complete_schema(schema_name=schema_name) + + def create_patterns( + self, + postgres_config: PostgresConfig, + schema_name: str | None = None, + ) -> Patterns: + """Create Patterns from PostgreSQL tables. + + Args: + postgres_config: PostgresConfig instance + schema_name: Schema name to introspect + + Returns: + Patterns: Patterns object with TablePattern instances for all tables + """ + with PostgresConnection(postgres_config) as postgres_conn: + return self.resource_mapper.create_patterns_from_postgres( + conn=postgres_conn, schema_name=schema_name + ) + + def ingest( + self, + schema: Schema, + output_config: DBConfig, + patterns: "Patterns | None" = None, + ingestion_params: IngestionParams | None = None, + ) -> None: + """Ingest data into the graph database. + + Args: + schema: Schema configuration for the graph + output_config: Target database connection configuration + patterns: Patterns instance mapping resources to data sources. + If None, defaults to empty Patterns() + ingestion_params: IngestionParams instance with ingestion configuration. + If None, uses the instance's default ingestion_params + """ + caster = Caster( + schema=schema, ingestion_params=ingestion_params or self.ingestion_params + ) + caster.ingest( + output_config=output_config, + patterns=patterns or Patterns(), + ingestion_params=ingestion_params or self.ingestion_params, + ) diff --git a/graflo/db/inferencer.py b/graflo/hq/inferencer.py similarity index 62% rename from graflo/db/inferencer.py rename to graflo/hq/inferencer.py index 38c4df1f..b610940c 100644 --- a/graflo/db/inferencer.py +++ b/graflo/hq/inferencer.py @@ -1,9 +1,8 @@ from graflo import Schema -from graflo.util.onto import Patterns, TablePattern from graflo.architecture import Resource from graflo.db import PostgresConnection from graflo.db.postgres import PostgresSchemaInferencer, PostgresResourceMapper -from graflo.db.sanitizer import SchemaSanitizer +from graflo.hq.sanitizer import SchemaSanitizer from graflo.onto import DBFlavor import logging @@ -126,81 +125,3 @@ def create_resources_for_schema( # Create resources return self.create_resources(introspection_result, schema) - - -def infer_schema_from_postgres( - conn: PostgresConnection, - schema_name: str | None = None, - db_flavor: DBFlavor = DBFlavor.ARANGO, -) -> Schema: - """Convenience function to infer a graflo Schema from PostgreSQL database. - - Args: - conn: PostgresConnection instance - schema_name: Schema name to introspect (defaults to config schema_name or 'public') - db_flavor: Target database flavor (defaults to ARANGO) - - Returns: - Schema: Inferred schema with vertices, edges, and resources - """ - manager = InferenceManager(conn, target_db_flavor=db_flavor) - return manager.infer_complete_schema(schema_name=schema_name) - - -def create_patterns_from_postgres( - conn: PostgresConnection, schema_name: str | None = None -) -> Patterns: - """Create Patterns from PostgreSQL tables. - - Args: - conn: PostgresConnection instance - schema_name: Schema name to introspect - - Returns: - Patterns: Patterns object with TablePattern instances for all tables - """ - - # Introspect the schema - introspection_result = conn.introspect_schema(schema_name=schema_name) - - # Create patterns - patterns = Patterns() - - # Get schema name - effective_schema = schema_name or introspection_result.schema_name - - # Store the connection config - config_key = "default" - patterns.postgres_configs[(config_key, effective_schema)] = conn.config - - # Add patterns for vertex tables - for table_info in introspection_result.vertex_tables: - table_name = table_info.name - table_pattern = TablePattern( - table_name=table_name, - schema_name=effective_schema, - resource_name=table_name, - ) - patterns.table_patterns[table_name] = table_pattern - patterns.postgres_table_configs[table_name] = ( - config_key, - effective_schema, - table_name, - ) - - # Add patterns for edge tables - for table_info in introspection_result.edge_tables: - table_name = table_info.name - table_pattern = TablePattern( - table_name=table_name, - schema_name=effective_schema, - resource_name=table_name, - ) - patterns.table_patterns[table_name] = table_pattern - patterns.postgres_table_configs[table_name] = ( - config_key, - effective_schema, - table_name, - ) - - return patterns diff --git a/graflo/hq/resource_mapper.py b/graflo/hq/resource_mapper.py new file mode 100644 index 00000000..cfe57a7e --- /dev/null +++ b/graflo/hq/resource_mapper.py @@ -0,0 +1,82 @@ +"""Resource mapper for creating Patterns from different data sources. + +This module provides functionality to create Patterns from various data sources +(PostgreSQL, files, etc.) that can be used for graph ingestion. +""" + +import logging + +from graflo.db import PostgresConnection +from graflo.util.onto import Patterns, TablePattern + +logger = logging.getLogger(__name__) + + +class ResourceMapper: + """Maps different data sources to Patterns for graph ingestion. + + This class provides methods to create Patterns from various data sources, + enabling a unified interface for pattern creation regardless of the source type. + """ + + def create_patterns_from_postgres( + self, conn: PostgresConnection, schema_name: str | None = None + ) -> Patterns: + """Create Patterns from PostgreSQL tables. + + Args: + conn: PostgresConnection instance + schema_name: Schema name to introspect + + Returns: + Patterns: Patterns object with TablePattern instances for all tables + """ + # Introspect the schema + introspection_result = conn.introspect_schema(schema_name=schema_name) + + # Create patterns + patterns = Patterns() + + # Get schema name + effective_schema = schema_name or introspection_result.schema_name + + # Store the connection config + config_key = "default" + patterns.postgres_configs[(config_key, effective_schema)] = conn.config + + # Add patterns for vertex tables + for table_info in introspection_result.vertex_tables: + table_name = table_info.name + table_pattern = TablePattern( + table_name=table_name, + schema_name=effective_schema, + resource_name=table_name, + ) + patterns.table_patterns[table_name] = table_pattern + patterns.postgres_table_configs[table_name] = ( + config_key, + effective_schema, + table_name, + ) + + # Add patterns for edge tables + for table_info in introspection_result.edge_tables: + table_name = table_info.name + table_pattern = TablePattern( + table_name=table_name, + schema_name=effective_schema, + resource_name=table_name, + ) + patterns.table_patterns[table_name] = table_pattern + patterns.postgres_table_configs[table_name] = ( + config_key, + effective_schema, + table_name, + ) + + return patterns + + # Future methods can be added here for other resource types: + # def create_patterns_from_files(...) -> Patterns: + # """Create Patterns from file sources.""" + # ... diff --git a/graflo/db/sanitizer.py b/graflo/hq/sanitizer.py similarity index 100% rename from graflo/db/sanitizer.py rename to graflo/hq/sanitizer.py diff --git a/pyproject.toml b/pyproject.toml index 5bfb1399..75f291a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ description = "A framework for transforming tabular (CSV, SQL) and hierarchical name = "graflo" readme = "README.md" requires-python = ">=3.11" -version = "1.4.2" +version = "1.4.3" [project.optional-dependencies] plot = [ diff --git a/test/conftest.py b/test/conftest.py index 71657ee8..147d77bc 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -10,7 +10,7 @@ from graflo.architecture.schema import Schema from graflo.architecture.util import cast_graph_name_to_triple -from graflo.caster import Caster +from graflo.hq.caster import Caster from graflo.util.misc import sorted_dicts from graflo.util.onto import Patterns, FilePattern @@ -71,7 +71,7 @@ def ingest_atomic(conn_conf, current_path, test_db_name, schema_o, mode, n_cores ) patterns.add_file_pattern(resource_name, file_pattern) - from graflo.caster import IngestionParams + from graflo.hq.caster import IngestionParams caster = Caster(schema_o) ingestion_params = IngestionParams(n_cores=n_cores, clean_start=True) diff --git a/test/data_source/test_api_data_source.py b/test/data_source/test_api_data_source.py index f536a034..c183d071 100644 --- a/test/data_source/test_api_data_source.py +++ b/test/data_source/test_api_data_source.py @@ -5,7 +5,7 @@ import pytest -from graflo.caster import Caster +from graflo.hq.caster import Caster from graflo.data_source import ( APIConfig, APIDataSource, diff --git a/test/db/postgres/test_schema_inference.py b/test/db/postgres/test_schema_inference.py index 4e05b784..b5c2dbfe 100644 --- a/test/db/postgres/test_schema_inference.py +++ b/test/db/postgres/test_schema_inference.py @@ -1,20 +1,22 @@ """Tests for PostgreSQL schema inference functionality. This module tests the schema inference capabilities, including: -- infer_schema_from_postgres() convenience function +- GraphEngine.infer_schema_from_postgres() method - Vertex and edge detection - Field type mapping - Resource creation """ -from graflo.db import infer_schema_from_postgres +from graflo.hq import GraphEngine +from graflo.onto import DBFlavor def test_infer_schema_from_postgres(postgres_conn, load_mock_schema): """Test that infer_schema_from_postgres correctly infers schema from PostgreSQL.""" _ = load_mock_schema # Ensure schema is loaded - schema = infer_schema_from_postgres(postgres_conn, schema_name="public") + engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO) + schema = engine.infer_schema(postgres_conn, schema_name="public") # Verify schema structure assert schema is not None @@ -179,7 +181,8 @@ def test_infer_schema_with_pg_catalog_fallback(postgres_conn, load_mock_schema): try: # Test that infer_schema_from_postgres works with pg_catalog fallback - schema = infer_schema_from_postgres(postgres_conn, schema_name="public") + engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO) + schema = engine.infer_schema(postgres_conn, schema_name="public") # Verify schema structure assert schema is not None, "Schema should be inferred" diff --git a/test/db/tigergraphs/test_reserved_words.py b/test/db/tigergraphs/test_reserved_words.py index da3b3f6b..88d26933 100644 --- a/test/db/tigergraphs/test_reserved_words.py +++ b/test/db/tigergraphs/test_reserved_words.py @@ -22,7 +22,7 @@ from graflo.onto import DBFlavor from test.conftest import fetch_schema_obj -from graflo.db.sanitizer import SchemaSanitizer +from graflo.hq.sanitizer import SchemaSanitizer logger = logging.getLogger(__name__) diff --git a/test/test_caster.py b/test/test_caster.py index df1c7a86..bb160344 100644 --- a/test/test_caster.py +++ b/test/test_caster.py @@ -7,7 +7,7 @@ import pytest from suthing import FileHandle -from graflo.caster import Caster +from graflo.hq.caster import Caster logger = logging.getLogger(__name__) diff --git a/uv.lock b/uv.lock index 29c44013..c1c739e0 100644 --- a/uv.lock +++ b/uv.lock @@ -348,7 +348,7 @@ wheels = [ [[package]] name = "graflo" -version = "1.4.2" +version = "1.4.3" source = { editable = "." } dependencies = [ { name = "click" },