diff --git a/docs/examples/example-5.md b/docs/examples/example-5.md index a0b1ee8..ed7b4e6 100644 --- a/docs/examples/example-5.md +++ b/docs/examples/example-5.md @@ -265,7 +265,7 @@ Automatically generate a graflo Schema from your PostgreSQL database. This is th from graflo.hq import GraphEngine from graflo.onto import DBFlavor -from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfig, FalkordbConfig +from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfig, FalkordbConfig, PostgresConfig from graflo.db import DBType # Connect to target graph database to determine flavor @@ -281,9 +281,11 @@ db_flavor = ( ) # Create GraphEngine and infer schema automatically +# Connection is automatically managed inside infer_schema() +postgres_conf = PostgresConfig.from_docker_env() engine = GraphEngine(target_db_flavor=db_flavor) schema = engine.infer_schema( - postgres_conn, + postgres_conf, schema_name="public", # PostgreSQL schema name ) ``` @@ -405,9 +407,6 @@ import yaml from graflo import Caster from graflo.onto import DBFlavor from graflo.db import DBType -from graflo.db.postgres import ( - PostgresConnection, -) from graflo.hq import GraphEngine from graflo.db.connection.onto import ArangoConfig, PostgresConfig @@ -415,7 +414,6 @@ logger = logging.getLogger(__name__) # Step 1: Connect to PostgreSQL (source database) postgres_conf = PostgresConfig.from_docker_env() -postgres_conn = PostgresConnection(postgres_conf) # Step 2: Initialize database with mock schema if needed # (Implementation details omitted - see full example in examples/5-ingest-postgres/ingest.py) @@ -427,6 +425,7 @@ from graflo.db.connection.onto import ArangoConfig, Neo4jConfig, TigergraphConfi target_config = ArangoConfig.from_docker_env() # or Neo4jConfig, TigergraphConfig, FalkordbConfig # Step 4: Infer Schema from PostgreSQL database structure +# Connection is automatically managed inside infer_schema() db_type = target_config.connection_type db_flavor = ( DBFlavor(db_type.value) @@ -437,7 +436,7 @@ db_flavor = ( # Create GraphEngine and infer schema engine = GraphEngine(target_db_flavor=db_flavor) schema = engine.infer_schema( - postgres_conn, + postgres_conf, schema_name="public", ) @@ -715,8 +714,10 @@ After inference, you can modify the schema: ```python # Create GraphEngine and infer schema +# Connection is automatically managed inside infer_schema() +postgres_conf = PostgresConfig.from_docker_env() engine = GraphEngine() -schema = engine.infer_schema(postgres_conn, schema_name="public") +schema = engine.infer_schema(postgres_conf, schema_name="public") # Modify schema as needed # Add custom transforms, filters, or additional edges diff --git a/docs/getting_started/quickstart.md b/docs/getting_started/quickstart.md index 8c67ae7..3de534b 100644 --- a/docs/getting_started/quickstart.md +++ b/docs/getting_started/quickstart.md @@ -108,17 +108,16 @@ The `ingest()` method takes: You can ingest data directly from PostgreSQL tables. First, infer the schema from your PostgreSQL database: ```python -from graflo.db.postgres import PostgresConnection from graflo.hq import GraphEngine from graflo.db.connection.onto import PostgresConfig # Connect to PostgreSQL pg_config = PostgresConfig.from_docker_env() # Or from_env(), or create directly -pg_conn = PostgresConnection(pg_config) # Create GraphEngine and infer schema from PostgreSQL (automatically detects vertices and edges) +# Connection is automatically managed inside infer_schema() engine = GraphEngine() -schema = engine.infer_schema(pg_conn, schema_name="public") +schema = engine.infer_schema(pg_config, schema_name="public") # Create patterns from PostgreSQL tables engine = GraphEngine() diff --git a/docs/reference/db/postgres/fuzzy_matcher.md b/docs/reference/db/postgres/fuzzy_matcher.md deleted file mode 100644 index d373cb8..0000000 --- a/docs/reference/db/postgres/fuzzy_matcher.md +++ /dev/null @@ -1,3 +0,0 @@ -# `graflo.db.postgres.fuzzy_matcher` - -::: graflo.db.postgres.fuzzy_matcher diff --git a/docs/reference/hq/fuzzy_matcher.md b/docs/reference/hq/fuzzy_matcher.md new file mode 100644 index 0000000..d99ef9e --- /dev/null +++ b/docs/reference/hq/fuzzy_matcher.md @@ -0,0 +1,3 @@ +# `graflo.hq.fuzzy_matcher` + +::: graflo.hq.fuzzy_matcher diff --git a/examples/3-ingest-csv-edge-weights/ingest.py b/examples/3-ingest-csv-edge-weights/ingest.py index 7eb0f25..a0cf9bc 100644 --- a/examples/3-ingest-csv-edge-weights/ingest.py +++ b/examples/3-ingest-csv-edge-weights/ingest.py @@ -1,7 +1,8 @@ from suthing import FileHandle from graflo import Caster, Patterns, Schema -from graflo.db.connection.onto import Neo4jConfig from graflo.hq.caster import IngestionParams +from graflo.db.connection.onto import Neo4jConfig + import logging diff --git a/examples/5-ingest-postgres/ingest.py b/examples/5-ingest-postgres/ingest.py index 2237357..86d1ff7 100644 --- a/examples/5-ingest-postgres/ingest.py +++ b/examples/5-ingest-postgres/ingest.py @@ -17,9 +17,6 @@ from graflo.onto import DBFlavor from graflo.db import DBType -from graflo.db.postgres import ( - PostgresConnection, -) from graflo.hq import GraphEngine, IngestionParams from graflo.db.postgres.util import load_schema_from_sql_file from graflo.db.connection.onto import PostgresConfig, TigergraphConfig @@ -29,7 +26,7 @@ # Configure logging: INFO level for graflo module, WARNING for others logging.basicConfig(level=logging.WARNING, handlers=[logging.StreamHandler()]) # Set graflo module to INFO level -logging.getLogger("graflo").setLevel(logging.INFO) +logging.getLogger("graflo").setLevel(logging.DEBUG) # Step 1: Connect to PostgreSQL (source database) # Load PostgreSQL config from docker/postgres/.env (recommended) @@ -100,9 +97,12 @@ # This automatically detects vertex-like and edge-like tables based on: # - Vertex tables: Have a primary key and descriptive columns # - Edge tables: Have 2+ foreign keys (representing relationships) -# Connection is automatically closed when exiting the context -with PostgresConnection(postgres_conf) as postgres_conn: - schema = engine.infer_schema(postgres_conn, schema_name="public") +# Connection is automatically managed inside infer_schema() +# Optionally specify fuzzy_threshold (0.0 to 1.0) to control fuzzy matching sensitivity: +# - Higher values (e.g., 0.9) = stricter matching, fewer matches +# - Lower values (e.g., 0.7) = more lenient matching, more matches +# Default is 0.8 +schema = engine.infer_schema(postgres_conf, schema_name="public", fuzzy_threshold=0.8) schema.general.name = "accounting" # Step 3.5: Dump inferred schema to YAML file diff --git a/examples/5-ingest-postgres/tigergraph/triples.gsql b/examples/5-ingest-postgres/tigergraph/triples.gsql index 80072ba..7591c2c 100644 --- a/examples/5-ingest-postgres/tigergraph/triples.gsql +++ b/examples/5-ingest-postgres/tigergraph/triples.gsql @@ -1,22 +1,17 @@ -CREATE OR REPLACE QUERY checkAll() FOR GRAPH public { +CREATE OR REPLACE QUERY triples() FOR GRAPH accounting { -TYPEDEF TUPLE TRIPLE; +TYPEDEF TUPLE TRIPLE; ListAccum @@triple_list; - /* 2. Start with users */ Seed = {users.*}; - /* 3. Capture follows (User -> User) */ S1 = SELECT t FROM Seed:s -(follows:e)-> users:t - ACCUM @@triple_list += TRIPLE(s.name, "follows", t.name); + ACCUM @@triple_list += TRIPLE(s, e, t); - /* 4. Capture purchases (User -> Product) */ S2 = SELECT t FROM Seed:s -(purchases:e)-> products:t - ACCUM @@triple_list += TRIPLE(s.name, "purchases", t.name); + ACCUM @@triple_list += TRIPLE(s, e, t); - /* 5. Print only the list */ - PRINT PRINT @@triple_list; } \ No newline at end of file diff --git a/graflo/db/postgres/conn.py b/graflo/db/postgres/conn.py index 926f622..5e356b8 100644 --- a/graflo/db/postgres/conn.py +++ b/graflo/db/postgres/conn.py @@ -34,8 +34,9 @@ ) from graflo.db.connection.onto import PostgresConfig +from graflo.hq.fuzzy_matcher import FuzzyMatcher + from .inference_utils import ( - FuzzyMatchCache, infer_edge_vertices_from_table_name, infer_vertex_from_column_name, ) @@ -747,8 +748,9 @@ def detect_edge_tables( vertex_tables = self.detect_vertex_tables(schema_name) vertex_table_names = [vt.name for vt in vertex_tables] - # Create fuzzy match cache once for all tables (significant performance improvement) - match_cache = FuzzyMatchCache(vertex_table_names) + # Create fuzzy matcher once for all tables (significant performance improvement) + # Caching is enabled by default for better performance + matcher = FuzzyMatcher(vertex_table_names, threshold=0.6, enable_cache=True) tables = self.get_tables(schema_name) edge_tables = [] @@ -819,7 +821,7 @@ def detect_edge_tables( for fk in fk_infos ] _, _, relation_name = infer_edge_vertices_from_table_name( - table_name, pk_columns, fk_dicts, vertex_table_names, match_cache + table_name, pk_columns, fk_dicts, vertex_table_names, matcher ) # If we have 2 or more primary keys, try to infer from table name and structure elif len(pk_columns) >= 2: @@ -839,7 +841,7 @@ def detect_edge_tables( pk_columns, fk_dicts, vertex_table_names, - match_cache, + matcher, ) ) @@ -882,10 +884,10 @@ def detect_edge_tables( ) # Use robust inference logic to extract vertex names from column names source_table = infer_vertex_from_column_name( - source_column, vertex_table_names, match_cache + source_column, vertex_table_names, matcher ) target_table = infer_vertex_from_column_name( - target_column, vertex_table_names, match_cache + target_column, vertex_table_names, matcher ) # Only add if we have source and target information diff --git a/graflo/db/postgres/inference_utils.py b/graflo/db/postgres/inference_utils.py index fabbd7d..1e55b9b 100644 --- a/graflo/db/postgres/inference_utils.py +++ b/graflo/db/postgres/inference_utils.py @@ -6,7 +6,7 @@ from typing import Any -from .fuzzy_matcher import FuzzyMatchCache, FuzzyMatcher +from graflo.hq.fuzzy_matcher import FuzzyMatcher def fuzzy_match_fragment( @@ -102,13 +102,13 @@ def _extract_key_fragments( def _match_vertices_from_table_fragments( table_fragments: list[str], - match_cache: FuzzyMatchCache, + matcher: FuzzyMatcher, ) -> tuple[int | None, int | None, str | None, str | None, set[str]]: """Match vertices from table name fragments using left-to-right and right-to-left strategy. Args: table_fragments: List of fragments from table name - match_cache: Fuzzy match cache for vertex matching + matcher: Fuzzy matcher for vertex matching Returns: Tuple of (source_match_idx, target_match_idx, source_vertex, target_vertex, matched_vertices_set) @@ -121,7 +121,7 @@ def _match_vertices_from_table_fragments( # Match source starting from the left for i, fragment in enumerate(table_fragments): - matched = match_cache.get_match(fragment) + matched = matcher.get_match(fragment) if matched and matched not in matched_vertices_set: source_match_idx = i source_vertex = matched @@ -135,7 +135,7 @@ def _match_vertices_from_table_fragments( -1, ): fragment = table_fragments[i] - matched = match_cache.get_match(fragment) + matched = matcher.get_match(fragment) if matched: target_match_idx = i target_vertex = matched @@ -153,7 +153,7 @@ def _match_vertices_from_table_fragments( def _match_vertices_from_key_fragments( key_fragments: list[str], - match_cache: FuzzyMatchCache, + matcher: FuzzyMatcher, matched_vertices_set: set[str], source_vertex: str | None, target_vertex: str | None, @@ -162,7 +162,7 @@ def _match_vertices_from_key_fragments( Args: key_fragments: List of fragments extracted from key columns - match_cache: Fuzzy match cache for vertex matching + matcher: Fuzzy matcher for vertex matching matched_vertices_set: Set of already matched vertices (will be updated) source_vertex: Source vertex matched from table name (if any) target_vertex: Target vertex matched from table name (if any) @@ -181,7 +181,7 @@ def _match_vertices_from_key_fragments( # Match key fragments for fragment in key_fragments: - matched = match_cache.get_match(fragment) + matched = matcher.get_match(fragment) if matched: if matched not in matched_vertices_set: matched_vertices.append(matched) @@ -386,7 +386,7 @@ def infer_edge_vertices_from_table_name( pk_columns: list[str], fk_columns: list[dict[str, Any]], vertex_table_names: list[str] | None = None, - match_cache: FuzzyMatchCache | None = None, + matcher: FuzzyMatcher | None = None, ) -> tuple[str | None, str | None, str | None]: """Infer source and target vertex names from table name and structure. @@ -402,7 +402,7 @@ def infer_edge_vertices_from_table_name( pk_columns: List of primary key column names fk_columns: List of foreign key dictionaries with 'column' and 'references_table' keys vertex_table_names: Optional list of known vertex table names for fuzzy matching - match_cache: Optional pre-computed fuzzy match cache for better performance + matcher: Optional pre-computed fuzzy matcher for better performance (with caching enabled) Returns: Tuple of (source_table, target_table, relation_name) or (None, None, None) if cannot infer @@ -410,9 +410,9 @@ def infer_edge_vertices_from_table_name( if vertex_table_names is None: vertex_table_names = [] - # Use cache if provided, otherwise create a temporary one - if match_cache is None: - match_cache = FuzzyMatchCache(vertex_table_names) + # Use matcher if provided, otherwise create a temporary one + if matcher is None: + matcher = FuzzyMatcher(vertex_table_names, threshold=0.6, enable_cache=True) # Step 1: Detect separator and split table name separator = detect_separator(table_name) @@ -428,11 +428,11 @@ def infer_edge_vertices_from_table_name( source_vertex, target_vertex, matched_vertices_set, - ) = _match_vertices_from_table_fragments(table_fragments, match_cache) + ) = _match_vertices_from_table_fragments(table_fragments, matcher) # Step 4: Match vertices from key fragments matched_vertices, key_matched_vertices = _match_vertices_from_key_fragments( - key_fragments, match_cache, matched_vertices_set, source_vertex, target_vertex + key_fragments, matcher, matched_vertices_set, source_vertex, target_vertex ) # Step 5: Extract FK vertex names @@ -459,14 +459,14 @@ def infer_edge_vertices_from_table_name( def _match_fragments_excluding_suffixes( fragments: list[str], - match_cache: FuzzyMatchCache, + matcher: FuzzyMatcher, common_suffixes: set[str], ) -> str | None: """Match fragments to vertices, excluding common suffixes. Args: fragments: List of fragments to match - match_cache: Fuzzy match cache for vertex matching + matcher: Fuzzy matcher for vertex matching common_suffixes: Set of common suffix strings to skip Returns: @@ -478,7 +478,7 @@ def _match_fragments_excluding_suffixes( if fragment_lower in common_suffixes: continue - matched = match_cache.get_match(fragment) + matched = matcher.get_match(fragment) if matched: return matched return None @@ -487,7 +487,7 @@ def _match_fragments_excluding_suffixes( def _try_match_without_suffix( fragments: list[str], separator: str, - match_cache: FuzzyMatchCache, + matcher: FuzzyMatcher, common_suffixes: set[str], ) -> str | None: """Try matching fragments after removing common suffix. @@ -495,7 +495,7 @@ def _try_match_without_suffix( Args: fragments: List of fragments separator: Separator character - match_cache: Fuzzy match cache for vertex matching + matcher: Fuzzy matcher for vertex matching common_suffixes: Set of common suffix strings Returns: @@ -506,7 +506,7 @@ def _try_match_without_suffix( if last_fragment in common_suffixes: # Try matching the remaining fragments remaining = separator.join(fragments[:-1]) - matched = match_cache.get_match(remaining) + matched = matcher.get_match(remaining) if matched: return matched return None @@ -548,7 +548,7 @@ def _try_exact_match_with_suffix_removal( def infer_vertex_from_column_name( column_name: str, vertex_table_names: list[str] | None = None, - match_cache: FuzzyMatchCache | None = None, + matcher: FuzzyMatcher | None = None, ) -> str | None: """Infer vertex table name from a column name using robust pattern matching. @@ -562,7 +562,7 @@ def infer_vertex_from_column_name( Args: column_name: Name of the column vertex_table_names: Optional list of known vertex table names for fuzzy matching - match_cache: Optional pre-computed fuzzy match cache for better performance + matcher: Optional pre-computed fuzzy matcher for better performance (with caching enabled) Returns: Inferred vertex table name or None if cannot infer @@ -570,9 +570,9 @@ def infer_vertex_from_column_name( if vertex_table_names is None: vertex_table_names = [] - # Use cache if provided, otherwise create a temporary one - if match_cache is None: - match_cache = FuzzyMatchCache(vertex_table_names) + # Use matcher if provided, otherwise create a temporary one + if matcher is None: + matcher = FuzzyMatcher(vertex_table_names, threshold=0.6, enable_cache=True) if not column_name: return None @@ -581,7 +581,7 @@ def infer_vertex_from_column_name( common_suffixes = {"id", "fk", "key", "pk", "ref", "reference"} # Step 1: Try matching full column name first - matched = match_cache.get_match(column_name) + matched = matcher.get_match(column_name) if matched: return matched @@ -593,16 +593,12 @@ def infer_vertex_from_column_name( return None # Step 3: Try matching fragments (excluding common suffixes) - matched = _match_fragments_excluding_suffixes( - fragments, match_cache, common_suffixes - ) + matched = _match_fragments_excluding_suffixes(fragments, matcher, common_suffixes) if matched: return matched # Step 4: Try removing common suffix and matching again - matched = _try_match_without_suffix( - fragments, separator, match_cache, common_suffixes - ) + matched = _try_match_without_suffix(fragments, separator, matcher, common_suffixes) if matched: return matched diff --git a/graflo/db/postgres/resource_mapping.py b/graflo/db/postgres/resource_mapping.py index bdd79bc..c682849 100644 --- a/graflo/db/postgres/resource_mapping.py +++ b/graflo/db/postgres/resource_mapping.py @@ -8,9 +8,8 @@ from graflo.architecture.resource import Resource from graflo.architecture.vertex import VertexConfig -from graflo.hq.sanitizer import SchemaSanitizer from .conn import EdgeTableInfo, SchemaIntrospectionResult -from .fuzzy_matcher import FuzzyMatchCache +from graflo.hq.fuzzy_matcher import FuzzyMatcher from .inference_utils import ( detect_separator, split_by_separator, @@ -26,6 +25,14 @@ class PostgresResourceMapper: and edges, enabling ingestion of relational data into graph databases. """ + def __init__(self, fuzzy_threshold: float = 0.8): + """Initialize the resource mapper. + + Args: + fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8) + """ + self.fuzzy_threshold = fuzzy_threshold + def create_vertex_resource(self, table_name: str, vertex_name: str) -> Resource: """Create a Resource for a vertex table. @@ -55,14 +62,14 @@ def create_edge_resource( self, edge_table_info: EdgeTableInfo, vertex_config: VertexConfig, - match_cache: FuzzyMatchCache | None = None, + matcher: FuzzyMatcher, ) -> Resource: """Create a Resource for an edge table. Args: edge_table_info: Edge table information from introspection vertex_config: Vertex configuration for source/target validation - match_cache: Optional fuzzy match cache for better performance + matcher: Optional fuzzy matcher for better performance (with caching enabled) Returns: Resource: Resource configured to ingest edge data @@ -101,12 +108,17 @@ def create_edge_resource( # Use heuristics to infer PK field names from column names # This handles cases like "bla_user" -> "user" vertex -> use "id" or matched field - vertex_names = list(vertex_config.vertex_set) source_pk_field = self._infer_pk_field_from_column( - source_column, source_table, source_pk_fields, vertex_names, match_cache + source_column, + source_table, + source_pk_fields, + matcher, ) target_pk_field = self._infer_pk_field_from_column( - target_column, target_table, target_pk_fields, vertex_names, match_cache + target_column, + target_table, + target_pk_fields, + matcher, ) # Create apply list using source_vertex and target_vertex pattern @@ -145,13 +157,12 @@ def create_edge_resource( return resource - @staticmethod def _infer_pk_field_from_column( + self, column_name: str, vertex_name: str, pk_fields: list[str], - vertex_names: list[str], - match_cache: FuzzyMatchCache | None = None, + matcher: FuzzyMatcher, ) -> str: """Infer primary key field name from column name using heuristics. @@ -173,8 +184,7 @@ def _infer_pk_field_from_column( column_name: Name of the column (e.g., "user_id", "bla_user", "bla_user_2") vertex_name: Name of the target vertex (already known from edge table info) pk_fields: List of primary key field names for the vertex - vertex_names: List of all vertex names for fuzzy matching - match_cache: Optional fuzzy match cache for better performance + matcher: Optional fuzzy matcher for better performance (with caching enabled) Returns: Primary key field name (defaults to first PK field or "id" if no match) @@ -187,14 +197,7 @@ def _infer_pk_field_from_column( # This confirms that the column is indeed related to this vertex for fragment in fragments: # Fuzzy match fragment to vertex names - if match_cache: - matched_vertex = match_cache.get_match(fragment) - else: - # Fallback: create temporary matcher if cache not provided - from .fuzzy_matcher import FuzzyMatcher - - matcher = FuzzyMatcher(vertex_names) - matched_vertex, _ = matcher.match(fragment) + matched_vertex = matcher.get_match(fragment) # If we found a match to our target vertex, use its PK field if matched_vertex == vertex_name: @@ -223,7 +226,7 @@ def map_tables_to_resources( self, introspection_result: SchemaIntrospectionResult, vertex_config: VertexConfig, - sanitizer: SchemaSanitizer, + fuzzy_threshold: float | None = None, ) -> list[Resource]: """Map all PostgreSQL tables to Resources. @@ -234,15 +237,20 @@ def map_tables_to_resources( introspection_result: Result from PostgresConnection.introspect_schema() vertex_config: Inferred vertex configuration sanitizer: carries mappiings + fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0) Returns: list[Resource]: List of Resources for all tables """ resources = [] - # Create fuzzy match cache once for all edge tables (significant performance improvement) + # Create fuzzy matcher once for all edge tables (significant performance improvement) + # Caching is enabled by default for better performance vertex_names = list(vertex_config.vertex_set) - match_cache = FuzzyMatchCache(vertex_names) + threshold = ( + fuzzy_threshold if fuzzy_threshold is not None else self.fuzzy_threshold + ) + matcher = FuzzyMatcher(vertex_names, threshold, enable_cache=True) # Map vertex tables to resources vertex_tables = introspection_result.vertex_tables @@ -257,7 +265,7 @@ def map_tables_to_resources( for edge_table_info in edge_tables: try: resource = self.create_edge_resource( - edge_table_info, vertex_config, match_cache + edge_table_info, vertex_config, matcher ) resources.append(resource) except ValueError as e: diff --git a/graflo/db/tigergraph/conn.py b/graflo/db/tigergraph/conn.py index 6101d5b..d6815c4 100644 --- a/graflo/db/tigergraph/conn.py +++ b/graflo/db/tigergraph/conn.py @@ -933,7 +933,10 @@ def _get_edge_types( def _get_installed_queries(self, graph_name: str | None = None) -> list[str]: """ - Get list of installed queries using GSQL. + Get list of installed queries using REST API. + + Uses the /endpoints endpoint with dynamic=true to get all installed query endpoints, + then extracts query names from the endpoint paths. Args: graph_name: Name of the graph (defaults to self.graphname) @@ -943,20 +946,37 @@ def _get_installed_queries(self, graph_name: str | None = None) -> list[str]: """ graph_name = graph_name or self.graphname try: - result = self._execute_gsql(f"USE GRAPH {graph_name}\nSHOW QUERY *") - # Parse GSQL output to extract query names + # Use REST API endpoint to get dynamic endpoints (installed queries) + # Format: GET /endpoints?dynamic=true + endpoint = "/endpoints" + params = {"dynamic": "true"} + result = self._call_restpp_api(endpoint, method="GET", params=params) + + # Parse the response to extract query names + # The response is a dict where keys are endpoint paths like: + # "POST /query/{graph_name}/{query_name}" or "GET /query/{graph_name}/{query_name}" queries = [] - if isinstance(result, str): - lines = result.split("\n") - for line in lines: - line = line.strip() - if line and not line.startswith("#") and not line.startswith("USE"): - # Query names are typically on their own lines - if line and not line.startswith("---"): - queries.append(line) - return queries if queries else [] + if isinstance(result, dict): + query_prefix = f"/query/{graph_name}/" + for endpoint_path in result.keys(): + # Extract query name from endpoint path + # Format: "POST /query/{graph_name}/{query_name}" or "GET /query/{graph_name}/{query_name}" + if query_prefix in endpoint_path: + # Extract the query name after the graph name + # Handle both "POST /query/..." and "/query/..." formats + idx = endpoint_path.find(query_prefix) + if idx >= 0: + query_part = endpoint_path[idx + len(query_prefix) :] + # Extract query name (everything up to first space, newline, or end) + query_name = query_part.split()[0] if query_part else "" + # Remove any trailing slashes or special characters + query_name = query_name.rstrip("/").strip() + if query_name and query_name not in queries: + queries.append(query_name) + + return queries except Exception as e: - logger.debug(f"Failed to get installed queries via GSQL: {e}") + logger.debug(f"Failed to get installed queries via REST API: {e}") return [] def _run_installed_query( @@ -1489,11 +1509,9 @@ def delete_database(self, name: str): """ Delete a TigerGraph database (graph). - This method attempts to drop the graph using GSQL DROP GRAPH. - If that fails (e.g., dependencies), it will: - 1) Remove associations and drop all edge types - 2) Drop all vertex types - 3) Clear remaining data as a last resort + This method attempts to drop the graph using a clean teardown sequence: + 1) Drop all queries associated with the graph + 2) Drop the graph itself Args: name: Name of the graph to delete @@ -1505,184 +1523,77 @@ def delete_database(self, name: str): try: logger.debug(f"Attempting to drop graph '{name}'") - # First, try to drop all queries associated with the graph - # Try multiple approaches to ensure queries are dropped - queries_dropped = False - try: - with self._ensure_graph_context(name): - # Get all installed queries for this graph - try: - queries = self._get_installed_queries() - if queries: - logger.info( - f"Dropping {len(queries)} queries from graph '{name}'" - ) - for query_name in queries: - try: - # Try DROP QUERY with IF EXISTS to avoid errors - drop_query_cmd = f"USE GRAPH {name}\nDROP QUERY {query_name} IF EXISTS" - self._execute_gsql(drop_query_cmd) - logger.debug( - f"Dropped query '{query_name}' from graph '{name}'" - ) - queries_dropped = True - except Exception: - # Try without IF EXISTS for older TigerGraph versions + # The order matters for a clean teardown + cleanup_script = f""" + USE GRAPH {name} + DROP QUERY * + USE GLOBAL + DROP GRAPH {name} + """ + result = self._execute_gsql(cleanup_script) + logger.info(f"Successfully dropped graph '{name}': {result}") + return result + except Exception as e: + error_str = str(e).lower() + # If the clean teardown fails, try fallback approaches + if ( + "depends on" in error_str + or "query" in error_str + or "not exist" in error_str + ): + logger.warning( + f"Clean teardown failed for graph '{name}': {e}. " + f"Attempting fallback cleanup." + ) + # Fallback: Try to drop queries individually, then drop graph + try: + with self._ensure_graph_context(name): + try: + queries = self._get_installed_queries() + if queries: + logger.info( + f"Dropping {len(queries)} queries from graph '{name}'" + ) + for query_name in queries: try: - drop_query_cmd = ( - f"USE GRAPH {name}\nDROP QUERY {query_name}" - ) + drop_query_cmd = f"USE GRAPH {name}\nDROP QUERY {query_name} IF EXISTS" self._execute_gsql(drop_query_cmd) logger.debug( f"Dropped query '{query_name}' from graph '{name}'" ) - queries_dropped = True - except Exception as qe2: - logger.warning( - f"Could not drop query '{query_name}' from graph '{name}': {qe2}" - ) - except Exception as e: - logger.debug(f"Could not list queries for graph '{name}': {e}") - except Exception as e: - logger.debug( - f"Could not access graph '{name}' to drop queries: {e}. " - f"Graph may not exist or queries may not be accessible." - ) + except Exception: + # Try without IF EXISTS for older TigerGraph versions + try: + drop_query_cmd = f"USE GRAPH {name}\nDROP QUERY {query_name}" + self._execute_gsql(drop_query_cmd) + except Exception as qe2: + logger.debug( + f"Could not drop query '{query_name}': {qe2}" + ) + except Exception as e2: + logger.debug( + f"Could not list queries for graph '{name}': {e2}" + ) - # If we couldn't drop queries through the API, try direct GSQL - if not queries_dropped: - try: - # Try to drop queries using GSQL directly - list_queries_cmd = f"USE GRAPH {name}\nSHOW QUERY *" - result = self._execute_gsql(list_queries_cmd) - # Parse result to get query names and drop them - # This is a fallback if getInstalledQueries() doesn't work - except Exception as e: - logger.debug( - f"Could not list queries via GSQL for graph '{name}': {e}" + # Now try to drop the graph + drop_command = f"USE GLOBAL\nDROP GRAPH {name}" + result = self._execute_gsql(drop_command) + logger.info( + f"Successfully dropped graph '{name}' via fallback: {result}" ) - - # Now try to drop the graph - # First, try to clear all data from the graph to avoid dependency issues - try: - with self._ensure_graph_context(name): - # Clear all vertices to remove dependencies - try: - vertex_types = self._get_vertex_types() - for v_type in vertex_types: - try: - self._delete_vertices(v_type) - logger.debug( - f"Cleared vertices of type '{v_type}' from graph '{name}'" - ) - except Exception as ve: - logger.debug( - f"Could not clear vertices '{v_type}': {ve}" - ) - except Exception as e: - logger.debug(f"Could not clear vertices: {e}") - except Exception as e: - logger.debug(f"Could not access graph context to clear data: {e}") - - try: - # Use the graph first to ensure we're working with the right graph - drop_command = f"USE GRAPH {name}\nDROP GRAPH {name}" - result = self._execute_gsql(drop_command) - logger.info(f"Successfully dropped graph '{name}': {result}") - return result - except Exception as e: - error_str = str(e).lower() - # If graph has dependencies (queries, etc.), try to continue anyway - # The graph structure might still be partially cleaned - if "depends on" in error_str or "query" in error_str: + return result + except Exception as fallback_error: logger.warning( - f"Could not fully drop graph '{name}' due to dependencies: {e}. " - f"Attempting to continue - graph may be partially cleaned." + f"Fallback cleanup also failed for graph '{name}': {fallback_error}. " + f"Graph may be partially cleaned or may not exist." ) # Don't raise - allow the process to continue # The schema creation will handle existing types return None - else: - error_msg = f"Could not drop graph '{name}'. Error: {e}" - logger.error(error_msg) - raise RuntimeError(error_msg) from e - - # Fallback 1: Attempt to disassociate edge and vertex types from graph - # DO NOT drop global vertex/edge types as they might be used by other graphs - try: - with self._ensure_graph_context(name): - # Disassociate edge types from graph (but don't drop them globally) - try: - edge_types = self._get_edge_types() - except Exception: - edge_types = [] - - for e_type in edge_types: - # Only disassociate from graph, don't drop globally - # ALTER GRAPH requires USE GRAPH context - try: - drop_edge_cmd = f"USE GRAPH {name}\nALTER GRAPH {name} DROP DIRECTED EDGE {e_type}" - self._execute_gsql(drop_edge_cmd) - logger.debug( - f"Disassociated edge type '{e_type}' from graph '{name}'" - ) - except Exception as e: - logger.debug( - f"Could not disassociate edge type '{e_type}' from graph '{name}': {e}" - ) - # Continue - edge might not be associated or graph might not exist - - # Disassociate vertex types from graph (but don't drop them globally) - try: - vertex_types = self._get_vertex_types() - except Exception: - vertex_types = [] - - for v_type in vertex_types: - # Only clear data from this graph's vertices, don't drop vertex type globally - # Clear data first to avoid dependency issues - try: - self._delete_vertices(v_type) - logger.debug( - f"Cleared vertices of type '{v_type}' from graph '{name}'" - ) - except Exception as e: - logger.debug( - f"Could not clear vertices of type '{v_type}' from graph '{name}': {e}" - ) - # Disassociate from graph (best-effort) - # ALTER GRAPH requires USE GRAPH context - try: - drop_vertex_cmd = f"USE GRAPH {name}\nALTER GRAPH {name} DROP VERTEX {v_type}" - self._execute_gsql(drop_vertex_cmd) - logger.debug( - f"Disassociated vertex type '{v_type}' from graph '{name}'" - ) - except Exception as e: - logger.debug( - f"Could not disassociate vertex type '{v_type}' from graph '{name}': {e}" - ) - # Continue - vertex might not be associated or graph might not exist - except Exception as e3: - logger.warning( - f"Could not disassociate schema types from graph '{name}': {e3}. Proceeding to data clear." - ) - - # Fallback 2: Clear all data (if any remain) - try: - with self._ensure_graph_context(name): - vertex_types = self._get_vertex_types() - for v_type in vertex_types: - result = self._delete_vertices(v_type) - logger.debug(f"Cleared vertices of type {v_type}: {result}") - logger.info(f"Cleared all data from graph '{name}'") - except Exception as e2: - logger.warning( - f"Could not clear data from graph '{name}': {e2}. Graph may not exist." - ) - - except Exception as e: - logger.error(f"Error deleting database '{name}': {e}") + else: + error_msg = f"Could not drop graph '{name}'. Error: {e}" + logger.error(error_msg) + raise RuntimeError(error_msg) from e @_wrap_tg_exception def execute(self, query, **kwargs): diff --git a/graflo/db/postgres/fuzzy_matcher.py b/graflo/hq/fuzzy_matcher.py similarity index 85% rename from graflo/db/postgres/fuzzy_matcher.py rename to graflo/hq/fuzzy_matcher.py index a66bc5b..2fc74ab 100644 --- a/graflo/db/postgres/fuzzy_matcher.py +++ b/graflo/hq/fuzzy_matcher.py @@ -8,7 +8,7 @@ class FuzzyMatcher: - """Improved fuzzy matcher with multiple matching strategies. + """Improved fuzzy matcher with multiple matching strategies and optional caching. Uses a combination of matching techniques: 1. Exact matching (case-insensitive) @@ -16,20 +16,30 @@ class FuzzyMatcher: 3. Sequence similarity (difflib) 4. Prefix/suffix matching 5. Common pattern matching (handles id, fk, etc.) + + Supports optional caching to improve performance when processing multiple fragments. """ - def __init__(self, vertex_names: list[str], threshold: float = 0.6): + def __init__( + self, vertex_names: list[str], threshold: float = 0.8, enable_cache: bool = True + ): """Initialize the fuzzy matcher. Args: vertex_names: List of vertex table names to match against threshold: Similarity threshold (0.0 to 1.0) + enable_cache: Whether to enable result caching (default: True) """ self.vertex_names = vertex_names self.threshold = threshold + self._enable_cache = enable_cache # Pre-compute lowercase versions for efficiency self._vertex_lower_map = {vn.lower(): vn for vn in vertex_names} self._vertex_lower_list = list(self._vertex_lower_map.keys()) + # Initialize cache if enabled + self._cache: dict[str, str | None] = {} + if enable_cache: + self._build_cache() def match(self, fragment: str) -> tuple[str | None, float]: """Match a fragment against vertex names using multiple strategies. @@ -45,9 +55,22 @@ def match(self, fragment: str) -> tuple[str | None, float]: fragment_lower = fragment.lower() + # Check cache first if enabled + if self._enable_cache and fragment_lower in self._cache: + cached_match = self._cache[fragment_lower] + if cached_match is not None: + return ( + cached_match, + 1.0, + ) # Cached matches are treated as high confidence + return (None, 0.0) + # Strategy 1: Exact match (highest priority, returns immediately) if fragment_lower in self._vertex_lower_map: - return (self._vertex_lower_map[fragment_lower], 1.0) + result = (self._vertex_lower_map[fragment_lower], 1.0) + if self._enable_cache: + self._cache[fragment_lower] = result[0] + return result best_match = None best_score = 0.0 @@ -74,7 +97,14 @@ def match(self, fragment: str) -> tuple[str | None, float]: # Return match only if above threshold if best_score >= self.threshold: - return (best_match, best_score) + result = (best_match, best_score) + if self._enable_cache: + self._cache[fragment_lower] = best_match + return result + + # Cache miss result + if self._enable_cache: + self._cache[fragment_lower] = None return (None, 0.0) def _substring_match(self, fragment_lower: str) -> tuple[str | None, float]: @@ -214,27 +244,6 @@ def _pattern_match(self, fragment_lower: str) -> tuple[str | None, float]: return (best_match, best_score) - -class FuzzyMatchCache: - """Cache for fuzzy matching fragments to vertex names. - - Pre-computes fuzzy matches for all fragments to avoid redundant computations. - This significantly improves performance when processing multiple tables. - """ - - def __init__(self, vertex_names: list[str], threshold: float = 0.6): - """Initialize the fuzzy match cache. - - Args: - vertex_names: List of vertex table names to match against - threshold: Similarity threshold (0.0 to 1.0) - """ - self.vertex_names = vertex_names - self.threshold = threshold - self._matcher = FuzzyMatcher(vertex_names, threshold) - self._cache: dict[str, str | None] = {} - self._build_cache() - def _build_cache(self) -> None: """Pre-compute fuzzy matches for common patterns.""" # Pre-compute exact matches (case-insensitive) @@ -249,21 +258,16 @@ def _build_cache(self) -> None: def get_match(self, fragment: str) -> str | None: """Get cached fuzzy match for a fragment, computing if not cached. + Convenience method that returns only the match (not the score). + Uses caching when enabled. + Args: fragment: Fragment to match Returns: Best matching vertex name or None if no match above threshold """ - fragment_lower = fragment.lower() - - # Check cache first - if fragment_lower in self._cache: - return self._cache[fragment_lower] - - # Compute match if not cached using improved matcher - match, _ = self._matcher.match(fragment) - self._cache[fragment_lower] = match + match, _ = self.match(fragment) return match def batch_match(self, fragments: list[str]) -> dict[str, str | None]: diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index b717ff2..e82319f 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -48,22 +48,27 @@ def __init__( def infer_schema( self, - postgres_conn: PostgresConnection, + postgres_config: PostgresConfig, schema_name: str | None = None, + fuzzy_threshold: float = 0.8, ) -> Schema: """Infer a graflo Schema from PostgreSQL database. Args: - postgres_conn: PostgresConnection instance + postgres_config: PostgresConfig instance schema_name: Schema name to introspect (defaults to config schema_name or 'public') + fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8) Returns: Schema: Inferred schema with vertices, edges, and resources """ - inferencer = InferenceManager( - conn=postgres_conn, target_db_flavor=self.target_db_flavor - ) - return inferencer.infer_complete_schema(schema_name=schema_name) + with PostgresConnection(postgres_config) as postgres_conn: + inferencer = InferenceManager( + conn=postgres_conn, + target_db_flavor=self.target_db_flavor, + fuzzy_threshold=fuzzy_threshold, + ) + return inferencer.infer_complete_schema(schema_name=schema_name) def create_patterns( self, diff --git a/graflo/hq/inferencer.py b/graflo/hq/inferencer.py index b610940..0432d67 100644 --- a/graflo/hq/inferencer.py +++ b/graflo/hq/inferencer.py @@ -16,12 +16,14 @@ def __init__( self, conn: PostgresConnection, target_db_flavor: DBFlavor = DBFlavor.ARANGO, + fuzzy_threshold: float = 0.8, ): """Initialize the PostgreSQL inference manager. Args: conn: PostgresConnection instance target_db_flavor: Target database flavor for schema sanitization + fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8) """ self.target_db_flavor = target_db_flavor self.sanitizer = SchemaSanitizer(target_db_flavor) @@ -29,7 +31,7 @@ def __init__( self.inferencer = PostgresSchemaInferencer( db_flavor=target_db_flavor, conn=conn ) - self.mapper = PostgresResourceMapper() + self.mapper = PostgresResourceMapper(fuzzy_threshold=fuzzy_threshold) def introspect(self, schema_name: str | None = None): """Introspect PostgreSQL schema. @@ -71,7 +73,9 @@ def create_resources( list[Resource]: List of Resources for PostgreSQL tables """ return self.mapper.map_tables_to_resources( - introspection_result, schema.vertex_config, self.sanitizer + introspection_result, + schema.vertex_config, + fuzzy_threshold=self.mapper.fuzzy_threshold, ) def infer_complete_schema(self, schema_name: str | None = None) -> Schema: diff --git a/test/db/postgres/test_relation_identification.py b/test/db/postgres/test_relation_identification.py index a2210a3..669b3fb 100644 --- a/test/db/postgres/test_relation_identification.py +++ b/test/db/postgres/test_relation_identification.py @@ -12,7 +12,7 @@ infer_edge_vertices_from_table_name, split_by_separator, ) -from graflo.db.postgres.fuzzy_matcher import FuzzyMatchCache +from graflo.hq.fuzzy_matcher import FuzzyMatcher class TestRelationIdentification: @@ -475,7 +475,7 @@ def test_match_vertices_from_table_fragments(self): """Test matching vertices from table name fragments.""" table_fragments = ["rel", "user", "purchases", "product"] vertex_names = ["user", "product"] - match_cache = FuzzyMatchCache(vertex_names) + matcher = FuzzyMatcher(vertex_names, threshold=0.6, enable_cache=True) ( source_idx, @@ -483,7 +483,7 @@ def test_match_vertices_from_table_fragments(self): source_vertex, target_vertex, matched_set, - ) = _match_vertices_from_table_fragments(table_fragments, match_cache) + ) = _match_vertices_from_table_fragments(table_fragments, matcher) # Should match source from left (user at index 1) assert source_idx == 1 @@ -499,7 +499,7 @@ def test_match_vertices_from_table_fragments_self_reference(self): """Test matching vertices when source and target are the same.""" table_fragments = ["user", "follows", "user"] vertex_names = ["user"] - match_cache = FuzzyMatchCache(vertex_names) + matcher = FuzzyMatcher(vertex_names, threshold=0.6, enable_cache=True) ( source_idx, @@ -507,7 +507,7 @@ def test_match_vertices_from_table_fragments_self_reference(self): source_vertex, target_vertex, matched_set, - ) = _match_vertices_from_table_fragments(table_fragments, match_cache) + ) = _match_vertices_from_table_fragments(table_fragments, matcher) # Should match source from left assert source_idx == 0 @@ -521,11 +521,11 @@ def test_match_vertices_from_key_fragments(self): """Test matching vertices from key fragments.""" key_fragments = ["cluster", "host", "id"] vertex_names = ["cluster", "host"] - match_cache = FuzzyMatchCache(vertex_names) + matcher = FuzzyMatcher(vertex_names, threshold=0.6, enable_cache=True) matched_set = set() matched_vertices, key_matched = _match_vertices_from_key_fragments( - key_fragments, match_cache, matched_set, None, None + key_fragments, matcher, matched_set, None, None ) # Should match cluster and host @@ -540,11 +540,11 @@ def test_match_vertices_from_key_fragments_with_existing(self): """Test matching when some vertices already matched from table name.""" key_fragments = ["cluster", "host"] vertex_names = ["cluster", "host", "user"] - match_cache = FuzzyMatchCache(vertex_names) + matcher = FuzzyMatcher(vertex_names, threshold=0.6, enable_cache=True) matched_set = {"user"} # Already matched from table name matched_vertices, key_matched = _match_vertices_from_key_fragments( - key_fragments, match_cache, matched_set, "user", None + key_fragments, matcher, matched_set, "user", None ) # Should include user (from table name) and cluster/host (from keys) diff --git a/test/db/postgres/test_schema_inference.py b/test/db/postgres/test_schema_inference.py index b5c2dbf..94b0e43 100644 --- a/test/db/postgres/test_schema_inference.py +++ b/test/db/postgres/test_schema_inference.py @@ -7,16 +7,18 @@ - Resource creation """ +from unittest.mock import patch + from graflo.hq import GraphEngine from graflo.onto import DBFlavor -def test_infer_schema_from_postgres(postgres_conn, load_mock_schema): +def test_infer_schema_from_postgres(conn_conf, load_mock_schema): """Test that infer_schema_from_postgres correctly infers schema from PostgreSQL.""" _ = load_mock_schema # Ensure schema is loaded engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO) - schema = engine.infer_schema(postgres_conn, schema_name="public") + schema = engine.infer_schema(conn_conf, schema_name="public") # Verify schema structure assert schema is not None @@ -164,7 +166,7 @@ def test_infer_schema_from_postgres(postgres_conn, load_mock_schema): print("=" * 80) -def test_infer_schema_with_pg_catalog_fallback(postgres_conn, load_mock_schema): +def test_infer_schema_with_pg_catalog_fallback(conn_conf, load_mock_schema): """Test that schema inference works correctly when using pg_catalog fallback methods. This test simulates a scenario where information_schema is unavailable or unreliable, @@ -176,13 +178,15 @@ def test_infer_schema_with_pg_catalog_fallback(postgres_conn, load_mock_schema): # Mock the _check_information_schema_reliable method to return False # This forces the use of pg_catalog fallback throughout the introspection process - original_check = postgres_conn._check_information_schema_reliable - postgres_conn._check_information_schema_reliable = lambda schema_name: False + # Since infer_schema() creates its own connection, we need to patch the class method + from graflo.db.postgres import PostgresConnection - try: + with patch.object( + PostgresConnection, "_check_information_schema_reliable", return_value=False + ): # Test that infer_schema_from_postgres works with pg_catalog fallback engine = GraphEngine(target_db_flavor=DBFlavor.ARANGO) - schema = engine.infer_schema(postgres_conn, schema_name="public") + schema = engine.infer_schema(conn_conf, schema_name="public") # Verify schema structure assert schema is not None, "Schema should be inferred" @@ -346,7 +350,3 @@ def test_infer_schema_with_pg_catalog_fallback(postgres_conn, load_mock_schema): print(f" - {r.name} (actors: {', '.join(actor_types)})") print("=" * 80) - - finally: - # Restore original method - postgres_conn._check_information_schema_reliable = original_check