From 85b7d67dd690c5e56e506d4bacafe4c14ef8c35d Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Sun, 1 Feb 2026 23:23:19 +0100 Subject: [PATCH 1/5] added discard disconnected vertices --- graflo/architecture/actor.py | 84 +++++++++++++++++++++++++++++++- graflo/architecture/schema.py | 55 +++++++++++++++++++++ graflo/architecture/vertex.py | 16 ++++++ graflo/hq/graph_engine.py | 8 ++- pyproject.toml | 8 +-- test/architecture/test_actor.py | 54 ++++++++++++++++++++ test/architecture/test_schema.py | 36 ++++++++++++++ test/architecture/test_vertex.py | 16 ++++++ uv.lock | 17 ++----- 9 files changed, 276 insertions(+), 18 deletions(-) diff --git a/graflo/architecture/actor.py b/graflo/architecture/actor.py index 2b74c6d8..35b3ddee 100644 --- a/graflo/architecture/actor.py +++ b/graflo/architecture/actor.py @@ -27,7 +27,7 @@ from functools import reduce from pathlib import Path from types import MappingProxyType -from typing import Any, Type +from typing import Any, Callable, Generic, Type, TypeVar from graflo.architecture.actor_util import ( add_blank_collections, @@ -916,8 +916,10 @@ def fetch_actors(self, level, edges): } ) +A = TypeVar("A", bound=Actor) -class ActorWrapper: + +class ActorWrapper(Generic[A]): """Wrapper class for managing actor instances. This class provides a unified interface for creating and managing different types @@ -1278,3 +1280,81 @@ def collect_actors(self) -> list[Actor]: for descendant in self.actor.descendants: actors.extend(descendant.collect_actors()) return actors + + def find_descendants( + self, + predicate: Callable[[ActorWrapper[Any]], bool] | None = None, + *, + actor_type: type[A] | None = None, + **attr_in: Any, + ) -> list[ActorWrapper]: + """Find all descendant ActorWrappers matching the given criteria. + + Traverses the actor tree and returns every ActorWrapper whose wrapped + actor matches. You can use a custom predicate, or filter by actor type + and attribute membership in sets. + + Args: + predicate: Optional callable(ActorWrapper) -> bool. If given, only + descendants for which predicate returns True are included. + actor_type: If given, only descendants whose wrapped actor is an + instance of this type are included (e.g. VertexActor, + TransformActor). + **attr_in: Attribute filters. Each key is an attribute name on the + wrapped actor; the value must be a set. A descendant is included + only if getattr(actor, key, None) is in that set. Examples: + name={"user", "product"} for VertexActor, + vertex={"target_a", "target_b"} for TransformActor (target_vertex). + + Returns: + list[ActorWrapper]: All matching descendants in the tree. + + Example: + >>> # All VertexActor descendants with name in {"user", "product"} + >>> wrappers.find_descendants(actor_type=VertexActor, name={"user", "product"}) + >>> # All TransformActor descendants with target_vertex in a set + >>> wrappers.find_descendants(actor_type=TransformActor, vertex={"a", "b"}) + >>> # Custom predicate + >>> wrappers.find_descendants(predicate=lambda w: isinstance(w.actor, VertexActor) and w.actor.name == "user") + """ + if predicate is None: + + def _predicate(w: ActorWrapper) -> bool: + if actor_type is not None and not isinstance(w.actor, actor_type): + return False + for attr, allowed in attr_in.items(): + if allowed is None: + continue + val = getattr(w.actor, attr, None) + if val not in allowed: + return False + return True + + predicate = _predicate + + result: list[ActorWrapper] = [] + if isinstance(self.actor, DescendActor): + for d in self.actor.descendants: + if predicate(d): + result.append(d) + result.extend(d.find_descendants(predicate=predicate)) + return result + + def remove_descendants_if(self, predicate: Callable[[ActorWrapper], bool]) -> None: + """Remove descendants for which predicate returns True. + + Mutates the tree in place: for each DescendActor, filters its + descendants to exclude wrappers matching the predicate, after + recursing into each descendant. Use with find_descendants to + remove actors that reference disconnected vertices. + + Args: + predicate: Callable(ActorWrapper) -> bool. Descendants for + which this returns True are removed from the tree. + """ + if isinstance(self.actor, DescendActor): + for d in list(self.actor.descendants): + d.remove_descendants_if(predicate=predicate) + self.actor._descendants[:] = [ + d for d in self.actor.descendants if not predicate(d) + ] diff --git a/graflo/architecture/schema.py b/graflo/architecture/schema.py index 7bca6b92..c3ff58b3 100644 --- a/graflo/architecture/schema.py +++ b/graflo/architecture/schema.py @@ -31,6 +31,7 @@ import logging from collections import Counter +from graflo.architecture.actor import EdgeActor, TransformActor, VertexActor from graflo.architecture.edge import EdgeConfig from graflo.architecture.resource import Resource from graflo.architecture.transform import ProtoTransform @@ -133,3 +134,57 @@ def fetch_resource(self, name: str | None = None) -> Resource: else: raise ValueError("Empty resource container 😕") return _current_resource + + def remove_disconnected_vertices(self) -> None: + """Remove vertices that do not take part in any relation (disconnected). + + Builds the set of vertex names that appear as source or target of any + edge, then removes from VertexConfig all other vertices. For each + resource, finds actors that reference disconnected vertices (via + find_descendants) and removes them from the actor tree. Resources + whose root actor references only disconnected vertices are removed. + + Mutates this schema in place. + """ + connected = self.edge_config.vertices + disconnected = self.vertex_config.vertex_set - connected + if not disconnected: + return + + self.vertex_config.remove_vertices(disconnected) + + def mentions_disconnected(wrapper): + actor = wrapper.actor + if isinstance(actor, VertexActor): + return actor.name in disconnected + if isinstance(actor, TransformActor): + return actor.vertex is not None and actor.vertex in disconnected + if isinstance(actor, EdgeActor): + return ( + actor.edge.source in disconnected + or actor.edge.target in disconnected + ) + return False + + to_drop: list[Resource] = [] + for resource in self.resources: + root = resource.root + to_remove = set( + root.find_descendants(actor_type=VertexActor, name=disconnected) + + root.find_descendants(actor_type=TransformActor, vertex=disconnected) + + root.find_descendants( + predicate=lambda w: isinstance(w.actor, EdgeActor) + and ( + w.actor.edge.source in disconnected + or w.actor.edge.target in disconnected + ), + ) + ) + if mentions_disconnected(root): + to_drop.append(resource) + continue + root.remove_descendants_if(lambda w: w in to_remove) + + for r in to_drop: + self.resources.remove(r) + self._resources.pop(r.name, None) diff --git a/graflo/architecture/vertex.py b/graflo/architecture/vertex.py index 2d21140d..f913be1e 100644 --- a/graflo/architecture/vertex.py +++ b/graflo/architecture/vertex.py @@ -528,6 +528,22 @@ def filters(self, vertex_name) -> list[Expression]: else: return [] + def remove_vertices(self, names: set[str]) -> None: + """Remove vertices by name. + + Removes vertices from the configuration and from blank_vertices + when present. Mutates the instance in place. + + Args: + names: Set of vertex names to remove + """ + if not names: + return + self.vertices[:] = [v for v in self.vertices if v.name not in names] + for n in names: + self._vertices_map.pop(n, None) + self.blank_vertices[:] = [b for b in self.blank_vertices if b not in names] + def update_vertex(self, v: Vertex): """Update vertex configuration. diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index 61f8cea2..cdc83ed8 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -53,6 +53,7 @@ def infer_schema( postgres_config: PostgresConfig, schema_name: str | None = None, fuzzy_threshold: float = 0.8, + discard_disconnected_vertices: bool = False, ) -> Schema: """Infer a graflo Schema from PostgreSQL database. @@ -60,6 +61,8 @@ def infer_schema( postgres_config: PostgresConfig instance schema_name: Schema name to introspect (defaults to config schema_name or 'public') fuzzy_threshold: Similarity threshold for fuzzy matching (0.0 to 1.0, default 0.8) + discard_disconnected_vertices: If True, remove vertices that do not take part in + any relation (and resources/actors that reference them). Default False. Returns: Schema: Inferred schema with vertices, edges, and resources @@ -70,7 +73,10 @@ def infer_schema( target_db_flavor=self.target_db_flavor, fuzzy_threshold=fuzzy_threshold, ) - return inferencer.infer_complete_schema(schema_name=schema_name) + schema = inferencer.infer_complete_schema(schema_name=schema_name) + if discard_disconnected_vertices: + schema.remove_disconnected_vertices() + return schema def create_patterns( self, diff --git a/pyproject.toml b/pyproject.toml index b4c6d16a..d2d4977d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "neo4j>=5.22.0,<6", "networkx~=3.3", "pandas-stubs==2.3.0.250703", - "pandas>=2.0.3,<3", + "pandas>=2.0.3,<4", "psycopg2-binary>=2.9.11", "pydantic-settings>=2.12.0", "pydantic>=2.12.5", @@ -46,8 +46,7 @@ dependencies = [ "redis>=5.0.0", "requests>=2.31.0", "sqlalchemy>=2.0.0", - "strenum>=0.4.15", - "suthing>=0.5.0", + "suthing>=0.5.1", "urllib3>=2.0.0", "xmltodict>=0.14.2,<0.15" ] @@ -68,6 +67,9 @@ manage_dbs = "graflo.cli.manage_dbs:manage_dbs" plot_schema = "graflo.cli.plot_schema:plot_schema" xml2json = "graflo.cli.plot_schema:xml2json" +[project.urls] +Homepage = "https://github.com/growgraph/graflo" + [tool.hatch.build.targets.sdist] include = ["graflo"] diff --git a/test/architecture/test_actor.py b/test/architecture/test_actor.py index 4fb99c4a..e322d724 100644 --- a/test/architecture/test_actor.py +++ b/test/architecture/test_actor.py @@ -5,6 +5,7 @@ DescendActor, EdgeActor, TransformActor, + VertexActor, ) from graflo.architecture.edge import EdgeConfig from graflo.architecture.onto import ActionContext, LocationIndex, VertexRep @@ -133,3 +134,56 @@ def test_resource_deb_compact(resource_deb_compact, data_deb, schema_vc_deb): ("package", "package", "suggests"): 2, ("package", "package", "breaks"): 1, } + + +def test_find_descendants_by_vertex_name(resource_descend, schema_vc_openalex): + """find_descendants returns VertexActors whose name is in the given set.""" + anw = ActorWrapper(**resource_descend) + anw.finish_init(vertex_config=schema_vc_openalex) + # Tree: DescendActor -> [DescendActor(apply with name "a"), VertexActor("work")] + by_name_work = anw.find_descendants(actor_type=VertexActor, name={"work"}) + assert len(by_name_work) == 1 + assert ( + isinstance(by_name_work[0].actor, VertexActor) + and by_name_work[0].actor.name == "work" + ) + by_name_empty = anw.find_descendants(actor_type=VertexActor, name={"nonexistent"}) + assert len(by_name_empty) == 0 + + +def test_find_descendants_by_type_and_predicate( + resource_openalex_works, schema_vc_openalex +): + """find_descendants with actor_type and custom predicate works on nested tree.""" + anw = ActorWrapper(*resource_openalex_works) + anw.finish_init(vertex_config=schema_vc_openalex, transforms={}) + all_vertex_work = anw.find_descendants(actor_type=VertexActor, name={"work"}) + assert len(all_vertex_work) == 2 # top-level and under referenced_works + assert all( + isinstance(w.actor, VertexActor) and w.actor.name == "work" + for w in all_vertex_work + ) + all_transform = anw.find_descendants(actor_type=TransformActor) + assert len(all_transform) == 3 # keep_suffix_id variants and under referenced_works + by_predicate = anw.find_descendants( + predicate=lambda w: isinstance(w.actor, VertexActor) and w.actor.name == "work" + ) + assert by_predicate == all_vertex_work + + +def test_find_descendants_transform_by_target_vertex( + resource_collision, vertex_config_collision +): + """find_descendants returns TransformActors whose target_vertex is in the given set.""" + anw = ActorWrapper(*resource_collision) + anw.finish_init(vertex_config=vertex_config_collision, transforms={}) + by_vertex = anw.find_descendants(actor_type=TransformActor, vertex={"person"}) + assert len(by_vertex) == 1 + assert ( + isinstance(by_vertex[0].actor, TransformActor) + and by_vertex[0].actor.vertex == "person" + ) + by_vertex_empty = anw.find_descendants( + actor_type=TransformActor, vertex={"nonexistent"} + ) + assert len(by_vertex_empty) == 0 diff --git a/test/architecture/test_schema.py b/test/architecture/test_schema.py index c6e7881f..f6f5c394 100644 --- a/test/architecture/test_schema.py +++ b/test/architecture/test_schema.py @@ -1,5 +1,6 @@ import logging +from graflo.architecture.actor import VertexActor from graflo.architecture.resource import Resource from graflo.architecture.schema import Schema @@ -34,3 +35,38 @@ def test_s(schema): sd = schema("ibes") sr = Schema.from_dict(sd) assert sr.general.name == "ibes" + + +def test_remove_disconnected_vertices(vertex_config_kg, edge_config_kg): + """remove_disconnected_vertices drops vertices not in any edge and related actors.""" + # vertex_config_kg has publication, entity, mention + # edge_config_kg has edges: entity-entity, entity-entity (aux), mention-entity + # So connected = {entity, mention}; publication is disconnected + schema_dict = { + "vertex_config": vertex_config_kg, + "edge_config": edge_config_kg, + "resources": [ + { + "resource_name": "r1", + "apply": [ + {"vertex": "publication"}, + {"vertex": "entity"}, + {"source": "mention", "target": "entity"}, + ], + }, + ], + "general": {"name": "kg"}, + } + sch = Schema.from_dict(schema_dict) + assert sch.vertex_config.vertex_set == {"publication", "entity", "mention"} + assert len(sch.resources) == 1 + root = sch.resources[0].root + assert len(root.find_descendants(actor_type=VertexActor, name={"publication"})) == 1 + + sch.remove_disconnected_vertices() + + assert sch.vertex_config.vertex_set == {"entity", "mention"} + assert "publication" not in sch.vertex_config.vertex_set + # Resource r1 should still exist but without the VertexActor(publication) + assert len(sch.resources) == 1 + assert len(root.find_descendants(actor_type=VertexActor, name={"publication"})) == 0 diff --git a/test/architecture/test_vertex.py b/test/architecture/test_vertex.py index 38e5a14b..cbd179ff 100644 --- a/test/architecture/test_vertex.py +++ b/test/architecture/test_vertex.py @@ -386,3 +386,19 @@ def test_vertex_config_fields_with_db_flavor(): # fields_names() returns strings field_names = config.fields_names("user") assert field_names == ["id", "name"] + + +def test_vertex_config_remove_vertices(): + """Test VertexConfig.remove_vertices removes vertices and updates blank_vertices.""" + v1 = Vertex.from_dict({"name": "a", "fields": ["id"]}) + v2 = Vertex.from_dict({"name": "b", "fields": ["id"]}) + v3 = Vertex.from_dict({"name": "c", "fields": ["id"]}) + config = VertexConfig( + vertices=[v1, v2, v3], + blank_vertices=["b"], + ) + assert config.vertex_set == {"a", "b", "c"} + config.remove_vertices({"b", "c"}) + assert config.vertex_set == {"a"} + assert config.vertices[0].name == "a" + assert config.blank_vertices == [] diff --git a/uv.lock b/uv.lock index ecb38e73..afeb73d9 100644 --- a/uv.lock +++ b/uv.lock @@ -367,7 +367,6 @@ dependencies = [ { name = "redis" }, { name = "requests" }, { name = "sqlalchemy" }, - { name = "strenum" }, { name = "suthing" }, { name = "urllib3" }, { name = "xmltodict" }, @@ -405,7 +404,7 @@ requires-dist = [ { name = "ijson", specifier = ">=3.2.3,<4" }, { name = "neo4j", specifier = ">=5.22.0,<6" }, { name = "networkx", specifier = "~=3.3" }, - { name = "pandas", specifier = ">=2.0.3,<3" }, + { name = "pandas", specifier = ">=2.0.3,<4" }, { name = "pandas-stubs", specifier = "==2.3.0.250703" }, { name = "psycopg2-binary", specifier = ">=2.9.11" }, { name = "pydantic", specifier = ">=2.12.5" }, @@ -416,8 +415,7 @@ requires-dist = [ { name = "redis", specifier = ">=5.0.0" }, { name = "requests", specifier = ">=2.31.0" }, { name = "sqlalchemy", specifier = ">=2.0.0" }, - { name = "strenum", specifier = ">=0.4.15" }, - { name = "suthing", specifier = ">=0.5.0" }, + { name = "suthing", specifier = ">=0.5.1" }, { name = "urllib3", specifier = ">=2.0.0" }, { name = "xmltodict", specifier = ">=0.14.2,<0.15" }, ] @@ -449,7 +447,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/cb/48e964c452ca2b92175a9b2dca037a553036cb053ba69e284650ce755f13/greenlet-3.3.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e29f3018580e8412d6aaf5641bb7745d38c85228dacf51a73bd4e26ddf2a6a8e", size = 274908, upload-time = "2025-12-04T14:23:26.435Z" }, { url = "https://files.pythonhosted.org/packages/28/da/38d7bff4d0277b594ec557f479d65272a893f1f2a716cad91efeb8680953/greenlet-3.3.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a687205fb22794e838f947e2194c0566d3812966b41c78709554aa883183fb62", size = 577113, upload-time = "2025-12-04T14:50:05.493Z" }, { url = "https://files.pythonhosted.org/packages/3c/f2/89c5eb0faddc3ff014f1c04467d67dee0d1d334ab81fadbf3744847f8a8a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:4243050a88ba61842186cb9e63c7dfa677ec146160b0efd73b855a3d9c7fcf32", size = 590338, upload-time = "2025-12-04T14:57:41.136Z" }, - { url = "https://files.pythonhosted.org/packages/80/d7/db0a5085035d05134f8c089643da2b44cc9b80647c39e93129c5ef170d8f/greenlet-3.3.0-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:670d0f94cd302d81796e37299bcd04b95d62403883b24225c6b5271466612f45", size = 601098, upload-time = "2025-12-04T15:07:11.898Z" }, { url = "https://files.pythonhosted.org/packages/dc/a6/e959a127b630a58e23529972dbc868c107f9d583b5a9f878fb858c46bc1a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6cb3a8ec3db4a3b0eb8a3c25436c2d49e3505821802074969db017b87bc6a948", size = 590206, upload-time = "2025-12-04T14:26:01.254Z" }, { url = "https://files.pythonhosted.org/packages/48/60/29035719feb91798693023608447283b266b12efc576ed013dd9442364bb/greenlet-3.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:2de5a0b09eab81fc6a382791b995b1ccf2b172a9fec934747a7a23d2ff291794", size = 1550668, upload-time = "2025-12-04T15:04:22.439Z" }, { url = "https://files.pythonhosted.org/packages/0a/5f/783a23754b691bfa86bd72c3033aa107490deac9b2ef190837b860996c9f/greenlet-3.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4449a736606bd30f27f8e1ff4678ee193bc47f6ca810d705981cfffd6ce0d8c5", size = 1615483, upload-time = "2025-12-04T14:27:28.083Z" }, @@ -457,7 +454,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, - { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -465,7 +461,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, - { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -473,7 +468,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" }, { url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" }, { url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" }, - { url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" }, { url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" }, { url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" }, { url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" }, @@ -481,7 +475,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" }, { url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" }, { url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" }, - { url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" }, { url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" }, { url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" }, { url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" }, @@ -1654,7 +1647,7 @@ wheels = [ [[package]] name = "suthing" -version = "0.5.0" +version = "0.5.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "dataclass-wizard" }, @@ -1664,9 +1657,9 @@ dependencies = [ { name = "pyyaml" }, { name = "strenum" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/32/ba/13b1fdafa2b2f34ce888e637749cb85069c663ab57683c421f2c6462ecb7/suthing-0.5.0.tar.gz", hash = "sha256:d10ef14141438fdd2b4ad1f10a5d22f47df533cabe744fd11f9ae6c17d70b168", size = 88672, upload-time = "2025-11-09T22:53:25.922Z" } +sdist = { url = "https://files.pythonhosted.org/packages/93/23/58936045eda0cbca5d501734ac1e8f76c6113903147f57856a080152105f/suthing-0.5.1.tar.gz", hash = "sha256:60ce491f94959ddcbb305abc3f82e05e2c9faef7397f7d4aa0836760ab1bd693", size = 7435, upload-time = "2026-02-01T19:42:43.865Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ad/a4/6797de8ca92643efc01c3eee5ed97c121412355430f3da744e5630d191b5/suthing-0.5.0-py3-none-any.whl", hash = "sha256:4fc73727f3e859c726d2225d62e064d9717b4bc9e59296ee5012997227e6a9b4", size = 8376, upload-time = "2025-11-09T22:53:24.815Z" }, + { url = "https://files.pythonhosted.org/packages/bb/e1/d97ad9adb2d773e33de2c756d7ea689ae2629515c08526b6ab9037424a79/suthing-0.5.1-py3-none-any.whl", hash = "sha256:d07a719b9ad4cfbd3ae931ce697d1a05b0447219d50493a036af618b329a629c", size = 8364, upload-time = "2026-02-01T19:42:45.005Z" }, ] [[package]] From 61ae97029c9ec348423a8c77ea8b470c97493ba4 Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Sun, 1 Feb 2026 23:25:34 +0100 Subject: [PATCH 2/5] output_config -> target_db_config --- README.md | 2 +- docs/examples/example-5.md | 4 ++-- docs/getting_started/quickstart.md | 4 ++-- examples/1-ingest-csv/ingest.py | 2 +- examples/2-ingest-self-references/ingest.py | 2 +- examples/3-ingest-csv-edge-weights/ingest.py | 2 +- examples/4-ingest-neo4j/ingest.py | 2 +- examples/5-ingest-postgres/ingest.py | 2 +- graflo/hq/graph_engine.py | 8 ++++---- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 2f9e033e..fe6a2185 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ ingestion_params = IngestionParams( engine.define_and_ingest( schema=schema, - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, clean_start=False, # Set to True to wipe existing database diff --git a/docs/examples/example-5.md b/docs/examples/example-5.md index 65e64833..0936b046 100644 --- a/docs/examples/example-5.md +++ b/docs/examples/example-5.md @@ -378,7 +378,7 @@ ingestion_params = IngestionParams( engine.define_and_ingest( schema=schema, - output_config=target_config, # Target graph database config + target_db_config=target_config, # Target graph database config patterns=patterns, # PostgreSQL table patterns ingestion_params=ingestion_params, clean_start=True, # Clear existing data first @@ -442,7 +442,7 @@ ingestion_params = IngestionParams( # Use GraphEngine to define schema and ingest data engine.define_and_ingest( schema=schema, - output_config=target_config, + target_db_config=target_config, patterns=patterns, ingestion_params=ingestion_params, clean_start=True, # Clear existing data first diff --git a/docs/getting_started/quickstart.md b/docs/getting_started/quickstart.md index edd475c6..01164ead 100644 --- a/docs/getting_started/quickstart.md +++ b/docs/getting_started/quickstart.md @@ -87,7 +87,7 @@ ingestion_params = IngestionParams( engine.define_and_ingest( schema=schema, - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, clean_start=False, # Set to True to wipe existing database @@ -166,7 +166,7 @@ ingestion_params = IngestionParams( engine.define_and_ingest( schema=schema, - output_config=arango_config, # Target graph database + target_db_config=arango_config, # Target graph database patterns=patterns, # Source PostgreSQL tables ingestion_params=ingestion_params, clean_start=False, # Set to True to wipe existing database diff --git a/examples/1-ingest-csv/ingest.py b/examples/1-ingest-csv/ingest.py index 726e406b..97648e85 100644 --- a/examples/1-ingest-csv/ingest.py +++ b/examples/1-ingest-csv/ingest.py @@ -54,7 +54,7 @@ ingestion_params = IngestionParams(clean_start=True) engine.define_and_ingest( schema=schema, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params, ) diff --git a/examples/2-ingest-self-references/ingest.py b/examples/2-ingest-self-references/ingest.py index e1384fd7..30c11a1b 100644 --- a/examples/2-ingest-self-references/ingest.py +++ b/examples/2-ingest-self-references/ingest.py @@ -45,7 +45,7 @@ ingestion_params = IngestionParams(clean_start=True) engine.define_and_ingest( schema=schema, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params, ) diff --git a/examples/3-ingest-csv-edge-weights/ingest.py b/examples/3-ingest-csv-edge-weights/ingest.py index 758a744b..85eba4d7 100644 --- a/examples/3-ingest-csv-edge-weights/ingest.py +++ b/examples/3-ingest-csv-edge-weights/ingest.py @@ -53,7 +53,7 @@ ingestion_params = IngestionParams(clean_start=True) engine.define_and_ingest( schema=schema, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params, ) diff --git a/examples/4-ingest-neo4j/ingest.py b/examples/4-ingest-neo4j/ingest.py index 73c6401d..cd96a84f 100644 --- a/examples/4-ingest-neo4j/ingest.py +++ b/examples/4-ingest-neo4j/ingest.py @@ -61,7 +61,7 @@ ) engine.define_and_ingest( schema=schema, - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, ) diff --git a/examples/5-ingest-postgres/ingest.py b/examples/5-ingest-postgres/ingest.py index 90fb26c5..fb11275f 100644 --- a/examples/5-ingest-postgres/ingest.py +++ b/examples/5-ingest-postgres/ingest.py @@ -115,7 +115,7 @@ # Note: ingestion will create its own PostgreSQL connections per table internally engine.define_and_ingest( schema=schema, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=IngestionParams( clean_start=False diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index cdc83ed8..cc7af861 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -134,7 +134,7 @@ def define_schema( def define_and_ingest( self, schema: Schema, - output_config: DBConfig, + target_db_config: DBConfig, patterns: "Patterns | None" = None, ingestion_params: IngestionParams | None = None, clean_start: bool | None = None, @@ -146,7 +146,7 @@ def define_and_ingest( Args: schema: Schema configuration for the graph - output_config: Target database connection configuration + target_db_config: Target database connection configuration patterns: Patterns instance mapping resources to data sources. If None, defaults to empty Patterns() ingestion_params: IngestionParams instance with ingestion configuration. @@ -165,7 +165,7 @@ def define_and_ingest( # Define schema first self.define_schema( schema=schema, - output_config=output_config, + output_config=target_db_config, clean_start=clean_start, ) @@ -178,7 +178,7 @@ def define_and_ingest( # Then ingest data self.ingest( schema=schema, - output_config=output_config, + output_config=target_db_config, patterns=patterns, ingestion_params=ingestion_params, ) From 1f8a9f5b71b79be41513634ef77f546dd84b1c13 Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Sun, 1 Feb 2026 23:56:38 +0100 Subject: [PATCH 3/5] refactored clean_start into recreate_schema and clear_data --- README.md | 12 +-- docs/examples/example-1.md | 4 +- docs/examples/example-2.md | 4 +- docs/examples/example-3.md | 4 +- docs/examples/example-4.md | 4 +- docs/examples/example-5.md | 8 +- docs/getting_started/quickstart.md | 18 ++--- docs/reference/data_source/index.md | 2 +- examples/1-ingest-csv/ingest.py | 2 +- examples/2-ingest-self-references/ingest.py | 2 +- examples/3-ingest-csv-edge-weights/ingest.py | 2 +- examples/4-ingest-neo4j/ingest.py | 2 +- examples/5-ingest-postgres/ingest.py | 6 +- graflo/__init__.py | 2 +- graflo/cli/ingest.py | 8 +- graflo/db/arango/conn.py | 53 +++++++++++-- graflo/db/conn.py | 25 ++++++- graflo/db/falkordb/__init__.py | 2 +- graflo/db/falkordb/conn.py | 47 ++++++++++-- graflo/db/memgraph/__init__.py | 2 +- graflo/db/memgraph/conn.py | 41 ++++++++-- graflo/db/neo4j/conn.py | 46 ++++++++++-- graflo/db/tigergraph/conn.py | 38 +++++++--- graflo/hq/caster.py | 15 ++-- graflo/hq/graph_engine.py | 78 +++++++++++--------- test/conftest.py | 10 +-- test/db/arangos/test_db_index.py | 2 +- test/db/tigergraphs/test_db_creation.py | 4 +- test/db/tigergraphs/test_db_index.py | 2 +- 29 files changed, 318 insertions(+), 127 deletions(-) diff --git a/README.md b/README.md index fe6a2185..dd59d263 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ from graflo.hq import GraphEngine # Option 1: Use GraphEngine for schema definition and ingestion (recommended) engine = GraphEngine() ingestion_params = IngestionParams( - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) # max_items=1000, # Optional: limit number of items to process # batch_size=10000, # Optional: customize batch size ) @@ -139,17 +139,17 @@ engine.define_and_ingest( target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) ) # Option 2: Use Caster directly (schema must be defined separately) # from graflo.hq import GraphEngine # engine = GraphEngine() -# engine.define_schema(schema=schema, output_config=conn_conf, clean_start=False) +# engine.define_schema(schema=schema, target_db_config=conn_conf, recreate_schema=False) # # caster = Caster(schema) # caster.ingest( -# output_config=conn_conf, +# target_db_config=conn_conf, # patterns=patterns, # ingestion_params=ingestion_params, # ) @@ -178,8 +178,8 @@ schema = engine.infer_schema( target_config = ArangoConfig.from_docker_env() engine.define_schema( schema=schema, - output_config=target_config, - clean_start=False, + target_db_config=target_config, + recreate_schema=False, ) # Use the inferred schema with Caster for ingestion diff --git a/docs/examples/example-1.md b/docs/examples/example-1.md index 12d8c886..99829d11 100644 --- a/docs/examples/example-1.md +++ b/docs/examples/example-1.md @@ -121,12 +121,12 @@ from graflo.hq.caster import IngestionParams caster = Caster(schema) ingestion_params = IngestionParams( - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) # max_items=1000, # Optional: limit number of items to process ) caster.ingest( - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, ) diff --git a/docs/examples/example-2.md b/docs/examples/example-2.md index 9e648964..fed7bbe8 100644 --- a/docs/examples/example-2.md +++ b/docs/examples/example-2.md @@ -132,11 +132,11 @@ patterns.add_file_pattern( from graflo.hq.caster import IngestionParams ingestion_params = IngestionParams( - clean_start=True, # Wipe existing database before ingestion + recreate_schema=True, # Wipe existing schema before defining and ingesting ) caster.ingest( - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, ) diff --git a/docs/examples/example-3.md b/docs/examples/example-3.md index 644fb5fb..9a5616f7 100644 --- a/docs/examples/example-3.md +++ b/docs/examples/example-3.md @@ -120,11 +120,11 @@ from graflo.hq.caster import IngestionParams caster = Caster(schema) ingestion_params = IngestionParams( - clean_start=True, # Wipe existing database before ingestion + recreate_schema=True, # Wipe existing schema before defining and ingesting ) caster.ingest( - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, ) diff --git a/docs/examples/example-4.md b/docs/examples/example-4.md index f2f5eeac..70cad859 100644 --- a/docs/examples/example-4.md +++ b/docs/examples/example-4.md @@ -214,11 +214,11 @@ from graflo.hq.caster import IngestionParams caster = Caster(schema) ingestion_params = IngestionParams( - clean_start=True, # Wipe existing database before ingestion + recreate_schema=True, # Wipe existing schema before defining and ingesting ) caster.ingest( - output_config=conn_conf, # Target database config + target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, ) diff --git a/docs/examples/example-5.md b/docs/examples/example-5.md index 0936b046..328f411b 100644 --- a/docs/examples/example-5.md +++ b/docs/examples/example-5.md @@ -373,7 +373,7 @@ from graflo.hq.caster import IngestionParams # Use GraphEngine for schema definition and ingestion engine = GraphEngine() ingestion_params = IngestionParams( - clean_start=True, # Clear existing data first + recreate_schema=True, # Drop existing schema and define new one before ingesting ) engine.define_and_ingest( @@ -381,7 +381,7 @@ engine.define_and_ingest( target_db_config=target_config, # Target graph database config patterns=patterns, # PostgreSQL table patterns ingestion_params=ingestion_params, - clean_start=True, # Clear existing data first + recreate_schema=True, # Drop existing schema and define new one before ingesting ) ``` @@ -436,7 +436,7 @@ patterns = engine.create_patterns(postgres_conf, schema_name="public") # Step 7: Define schema and ingest data ingestion_params = IngestionParams( - clean_start=True, # Clear existing data first + recreate_schema=True, # Drop existing schema and define new one before ingesting ) # Use GraphEngine to define schema and ingest data @@ -445,7 +445,7 @@ engine.define_and_ingest( target_db_config=target_config, patterns=patterns, ingestion_params=ingestion_params, - clean_start=True, # Clear existing data first + recreate_schema=True, # Drop existing schema and define new one before ingesting ) print("\n" + "=" * 80) diff --git a/docs/getting_started/quickstart.md b/docs/getting_started/quickstart.md index 01164ead..9466133f 100644 --- a/docs/getting_started/quickstart.md +++ b/docs/getting_started/quickstart.md @@ -82,7 +82,7 @@ from graflo.hq import GraphEngine # Option 1: Use GraphEngine for schema definition and ingestion (recommended) engine = GraphEngine() ingestion_params = IngestionParams( - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) ) engine.define_and_ingest( @@ -90,16 +90,16 @@ engine.define_and_ingest( target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) ) # Option 2: Use Caster directly (schema must be defined separately) # engine = GraphEngine() -# engine.define_schema(schema=schema, output_config=conn_conf, clean_start=False) +# engine.define_schema(schema=schema, target_db_config=conn_conf, recreate_schema=False) # # caster = Caster(schema) # caster.ingest( -# output_config=conn_conf, +# target_db_config=conn_conf, # patterns=patterns, # ingestion_params=ingestion_params, # ) @@ -112,7 +112,7 @@ The `Patterns` class maps resource names (from `Schema`) to their physical data - **TablePattern**: For PostgreSQL table resources with connection configuration The `ingest()` method takes: -- `output_config`: Target graph database configuration (where to write the graph) +- `target_db_config`: Target graph database configuration (where to write the graph) - `patterns`: Source data patterns (where to read data from - files or database tables) ## 🚀 Using PostgreSQL Tables as Data Sources @@ -161,7 +161,7 @@ arango_config = ArangoConfig.from_docker_env() # Target graph database # Use GraphEngine for schema definition and ingestion engine = GraphEngine() ingestion_params = IngestionParams( - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) ) engine.define_and_ingest( @@ -169,7 +169,7 @@ engine.define_and_ingest( target_db_config=arango_config, # Target graph database patterns=patterns, # Source PostgreSQL tables ingestion_params=ingestion_params, - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists) ) ``` @@ -211,8 +211,8 @@ from graflo.hq import GraphEngine engine = GraphEngine() engine.define_schema( schema=schema, - output_config=conn_conf, - clean_start=False, + target_db_config=conn_conf, + recreate_schema=False, ) # Then ingest using Caster diff --git a/docs/reference/data_source/index.md b/docs/reference/data_source/index.md index 1037c458..4f0936e7 100644 --- a/docs/reference/data_source/index.md +++ b/docs/reference/data_source/index.md @@ -170,7 +170,7 @@ caster = Caster(schema) ingestion_params = IngestionParams( batch_size=1000, # Process 1000 items per batch - clean_start=False, # Set to True to wipe existing database + recreate_schema=False, # Set to True to drop and redefine schema ) caster.ingest_data_sources( diff --git a/examples/1-ingest-csv/ingest.py b/examples/1-ingest-csv/ingest.py index 97648e85..a3d008ed 100644 --- a/examples/1-ingest-csv/ingest.py +++ b/examples/1-ingest-csv/ingest.py @@ -51,7 +51,7 @@ # Create GraphEngine and define schema + ingest in one operation engine = GraphEngine(target_db_flavor=db_type) -ingestion_params = IngestionParams(clean_start=True) +ingestion_params = IngestionParams(clear_data=True) engine.define_and_ingest( schema=schema, target_db_config=conn_conf, diff --git a/examples/2-ingest-self-references/ingest.py b/examples/2-ingest-self-references/ingest.py index 30c11a1b..1a0b7c0b 100644 --- a/examples/2-ingest-self-references/ingest.py +++ b/examples/2-ingest-self-references/ingest.py @@ -42,7 +42,7 @@ # Create GraphEngine and define schema + ingest in one operation engine = GraphEngine(target_db_flavor=db_type) -ingestion_params = IngestionParams(clean_start=True) +ingestion_params = IngestionParams(clear_data=True) engine.define_and_ingest( schema=schema, target_db_config=conn_conf, diff --git a/examples/3-ingest-csv-edge-weights/ingest.py b/examples/3-ingest-csv-edge-weights/ingest.py index 85eba4d7..a93f1a43 100644 --- a/examples/3-ingest-csv-edge-weights/ingest.py +++ b/examples/3-ingest-csv-edge-weights/ingest.py @@ -50,7 +50,7 @@ # Create GraphEngine and define schema + ingest in one operation engine = GraphEngine(target_db_flavor=db_type) -ingestion_params = IngestionParams(clean_start=True) +ingestion_params = IngestionParams(clear_data=True) engine.define_and_ingest( schema=schema, target_db_config=conn_conf, diff --git a/examples/4-ingest-neo4j/ingest.py b/examples/4-ingest-neo4j/ingest.py index cd96a84f..59a5ab88 100644 --- a/examples/4-ingest-neo4j/ingest.py +++ b/examples/4-ingest-neo4j/ingest.py @@ -56,7 +56,6 @@ # Create GraphEngine and define schema + ingest in one operation engine = GraphEngine(target_db_flavor=db_type) ingestion_params = IngestionParams( - clean_start=True, # max_items=5, ) engine.define_and_ingest( @@ -64,4 +63,5 @@ target_db_config=conn_conf, # Target database config patterns=patterns, # Source data patterns ingestion_params=ingestion_params, + recreate_schema=True, # Wipe existing schema before defining and ingesting ) diff --git a/examples/5-ingest-postgres/ingest.py b/examples/5-ingest-postgres/ingest.py index fb11275f..5271657c 100644 --- a/examples/5-ingest-postgres/ingest.py +++ b/examples/5-ingest-postgres/ingest.py @@ -117,10 +117,8 @@ schema=schema, target_db_config=conn_conf, patterns=patterns, - ingestion_params=IngestionParams( - clean_start=False - ), # clean_start handled by define_and_ingest - clean_start=True, # Clean existing data before defining schema + ingestion_params=IngestionParams(clear_data=False), + recreate_schema=True, # Drop existing schema and define new one before ingesting ) print("\n" + "=" * 80) diff --git a/graflo/__init__.py b/graflo/__init__.py index 0d906fd2..7589f2ba 100644 --- a/graflo/__init__.py +++ b/graflo/__init__.py @@ -15,7 +15,7 @@ Example: >>> from graflo.db.manager import ConnectionManager >>> with ConnectionManager(config) as conn: - ... conn.init_db(schema, clean_start=True) + ... conn.init_db(schema, recreate_schema=True) ... conn.upsert_docs_batch(docs, "users") """ diff --git a/graflo/cli/ingest.py b/graflo/cli/ingest.py index f2f0c0ca..65a75519 100644 --- a/graflo/cli/ingest.py +++ b/graflo/cli/ingest.py @@ -163,12 +163,12 @@ def ingest( limit_files=limit_files, ) - # Define schema first (if clean_start is requested) + # Define schema first (if recreate_schema is requested) if fresh_start: engine.define_schema( schema=schema, - output_config=conn_conf, - clean_start=True, + target_db_config=conn_conf, + recreate_schema=True, ) # Validate that either source_path or data_source_config_path is provided @@ -211,7 +211,7 @@ def ingest( # Fall back to file-based ingestion using GraphEngine engine.ingest( schema=schema, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params, ) diff --git a/graflo/db/arango/conn.py b/graflo/db/arango/conn.py index 51d85f8b..d6cad87d 100644 --- a/graflo/db/arango/conn.py +++ b/graflo/db/arango/conn.py @@ -19,7 +19,7 @@ Example: >>> conn = ArangoConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) >>> conn.upsert_docs_batch(docs, "users", match_keys=["email"]) """ @@ -39,7 +39,7 @@ from graflo.architecture.vertex import VertexConfig from graflo.db.arango.query import fetch_fields_query from graflo.db.arango.util import render_filters -from graflo.db.conn import Connection +from graflo.db.conn import Connection, SchemaExistsError from graflo.db.util import get_data_from_cursor, json_serializer from graflo.filter.onto import Clause from graflo.onto import AggregationType @@ -187,15 +187,19 @@ def close(self) -> None: # self.conn.close() pass - def init_db(self, schema: Schema, clean_start: bool) -> None: + def init_db(self, schema: Schema, recreate_schema: bool) -> None: """Initialize ArangoDB with the given schema. Checks if the database exists and creates it if it doesn't. Uses schema.general.name if database is not set in config. + If the schema/graph already exists and recreate_schema is False, raises + SchemaExistsError and the script halts. + Args: schema: Schema containing graph structure definitions - clean_start: If True, delete all existing vertex and edge classes before initialization + recreate_schema: If True, drop existing vertex/edge classes and define new ones. + If False and any collections or graphs exist, raises SchemaExistsError. """ # Determine database name: use config.database if set, otherwise use schema.general.name db_name = self.config.database @@ -248,13 +252,31 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: raise try: - if clean_start: + # Check if schema/graph already exists (any non-system collection or graph) + graphs_result = self.conn.graphs() + collections_result = self.conn.collections() + has_graphs = isinstance(graphs_result, list) and len(graphs_result) > 0 + non_system = [] + if isinstance(collections_result, list): + for c in collections_result: + if isinstance(c, dict): + name_value = cast(dict[str, Any], c).get("name") + if isinstance(name_value, str) and name_value[0] != "_": + non_system.append(name_value) + has_collections = len(non_system) > 0 + if (has_graphs or has_collections) and not recreate_schema: + raise SchemaExistsError( + f"Schema/graph already exists in database '{db_name}'. " + "Set recreate_schema=True to replace, or use clear_data=True before ingestion." + ) + + if recreate_schema: try: self.delete_graph_structure((), (), delete_all=True) logger.debug(f"Cleaned database '{db_name}' for fresh start") except Exception as clean_error: logger.warning( - f"Error during clean_start for database '{db_name}': {clean_error}", + f"Error during recreate_schema for database '{db_name}': {clean_error}", exc_info=True, ) # Continue - may be first run or already clean @@ -278,6 +300,8 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: exc_info=True, ) raise + except SchemaExistsError: + raise except Exception as e: logger.error( f"Error during database schema initialization for '{db_name}': {e}", @@ -285,6 +309,23 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: ) raise + def clear_data(self, schema: Schema) -> None: + """Remove all data from collections without dropping the schema. + + Truncates vertex and edge collections that belong to the schema. + """ + vc = schema.vertex_config + for v in vc.vertex_set: + cname = vc.vertex_dbname(v) + if self.conn.has_collection(cname): + self.conn.collection(cname).truncate() + logger.debug(f"Truncated vertex collection '{cname}'") + for edge in schema.edge_config.edges_list(include_aux=True): + cname = edge.database_name + if cname and self.conn.has_collection(cname): + self.conn.collection(cname).truncate() + logger.debug(f"Truncated edge collection '{cname}'") + def define_schema(self, schema: Schema) -> None: """Define ArangoDB collections based on schema. diff --git a/graflo/db/conn.py b/graflo/db/conn.py index 5d8cbd86..a51bdda1 100644 --- a/graflo/db/conn.py +++ b/graflo/db/conn.py @@ -63,6 +63,14 @@ ConnectionType = TypeVar("ConnectionType", bound="Connection") +class SchemaExistsError(RuntimeError): + """Raised when schema/graph already exists and recreate_schema is False. + + Set recreate_schema=True to replace the existing schema, or use clear_data=True + before ingestion to only clear data without touching the schema. + """ + + class Connection(abc.ABC): """Abstract base class for database connections. @@ -157,12 +165,25 @@ def delete_graph_structure( pass @abc.abstractmethod - def init_db(self, schema: Schema, clean_start: bool) -> None: + def init_db(self, schema: Schema, recreate_schema: bool) -> None: """Initialize the database with the given schema. + If the schema/graph already exists and recreate_schema is False, raises + SchemaExistsError and the script halts. + Args: schema: Schema to initialize the database with - clean_start: Whether to clean existing data + recreate_schema: If True, drop existing schema and define new one. + If False and schema/graph already exists, raises SchemaExistsError. + """ + pass + + @abc.abstractmethod + def clear_data(self, schema: Schema) -> None: + """Remove all data from the graph without dropping or changing the schema. + + Args: + schema: Schema describing the graph (used to identify collections/labels). """ pass diff --git a/graflo/db/falkordb/__init__.py b/graflo/db/falkordb/__init__.py index 0ea6b6cd..48f0671c 100644 --- a/graflo/db/falkordb/__init__.py +++ b/graflo/db/falkordb/__init__.py @@ -17,7 +17,7 @@ >>> from graflo.db.connection import FalkordbConfig >>> config = FalkordbConfig(uri="redis://localhost:6379", database="mygraph") >>> conn = FalkordbConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) """ from .conn import FalkordbConnection diff --git a/graflo/db/falkordb/conn.py b/graflo/db/falkordb/conn.py index 0867fd71..07fd1e46 100644 --- a/graflo/db/falkordb/conn.py +++ b/graflo/db/falkordb/conn.py @@ -20,7 +20,7 @@ Example: >>> conn = FalkordbConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) >>> conn.upsert_docs_batch(docs, "Person", match_keys=["id"]) """ @@ -35,7 +35,7 @@ from graflo.architecture.onto import Index from graflo.architecture.schema import Schema from graflo.architecture.vertex import VertexConfig -from graflo.db.conn import Connection +from graflo.db.conn import Connection, SchemaExistsError from graflo.db.util import serialize_value from graflo.filter.onto import Expression from graflo.onto import AggregationType, ExpressionFlavor @@ -438,14 +438,18 @@ def delete_graph_structure( except Exception as e: logger.warning(f"Failed to delete graph '{graph_name}': {e}") - def init_db(self, schema: Schema, clean_start: bool) -> None: + def init_db(self, schema: Schema, recreate_schema: bool) -> None: """Initialize FalkorDB with the given schema. Uses schema.general.name if database is not set in config. + If the graph already has nodes and recreate_schema is False, raises + SchemaExistsError and the script halts. + Args: schema: Schema containing graph structure definitions - clean_start: If True, delete all existing data before initialization + recreate_schema: If True, delete all existing data before initialization. + If False and graph has nodes, raises SchemaExistsError. """ # Determine graph name: use config.database if set, otherwise use schema.general.name graph_name = self.config.database @@ -459,12 +463,36 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: self._graph_name = graph_name logger.info(f"Initialized FalkorDB graph '{graph_name}'") - if clean_start: + # Check if graph already has nodes (schema/graph exists) + try: + result = self.execute("MATCH (n) RETURN count(n) AS c") + count = 0 + if hasattr(result, "data") and result.data(): + count = result.data()[0].get("c", 0) or 0 + elif result is not None and hasattr(result, "__iter__"): + for record in result: + count = ( + record.get("c", 0) + if hasattr(record, "get") + else getattr(record, "c", 0) + ) or 0 + break + if count > 0 and not recreate_schema: + raise SchemaExistsError( + f"Schema/graph already exists in graph '{graph_name}' ({count} nodes). " + "Set recreate_schema=True to replace, or use clear_data=True before ingestion." + ) + except SchemaExistsError: + raise + except Exception as e: + logger.debug(f"Could not check graph node count: {e}") + + if recreate_schema: try: self.delete_graph_structure(delete_all=True) logger.debug(f"Cleaned graph '{graph_name}' for fresh start") except Exception as e: - logger.debug(f"Clean start note for graph '{graph_name}': {e}") + logger.debug(f"Recreate schema note for graph '{graph_name}': {e}") try: self.define_indexes(schema) @@ -476,6 +504,13 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: ) raise + def clear_data(self, schema: Schema) -> None: + """Remove all data from the graph without dropping the schema. + + Deletes all nodes and relationships; labels (schema) remain. + """ + self.delete_graph_structure(delete_all=True) + def upsert_docs_batch( self, docs: list[dict[str, Any]], diff --git a/graflo/db/memgraph/__init__.py b/graflo/db/memgraph/__init__.py index a3291c8a..b22986db 100644 --- a/graflo/db/memgraph/__init__.py +++ b/graflo/db/memgraph/__init__.py @@ -17,7 +17,7 @@ >>> from graflo.db.connection import MemgraphConfig >>> config = MemgraphConfig(uri="bolt://localhost:7687") >>> conn = MemgraphConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) """ from .conn import MemgraphConnection diff --git a/graflo/db/memgraph/conn.py b/graflo/db/memgraph/conn.py index 34fac402..97079133 100644 --- a/graflo/db/memgraph/conn.py +++ b/graflo/db/memgraph/conn.py @@ -86,7 +86,7 @@ from graflo.architecture.edge import Edge from graflo.architecture.schema import Schema from graflo.architecture.vertex import VertexConfig -from graflo.db.conn import Connection +from graflo.db.conn import Connection, SchemaExistsError from graflo.filter.onto import Expression from graflo.onto import AggregationType, ExpressionFlavor from graflo.onto import DBType @@ -542,26 +542,55 @@ def delete_graph_structure( except Exception as e: logger.warning(f"Failed to delete nodes with label '{label}': {e}") - def init_db(self, schema: Schema, clean_start: bool) -> None: + def init_db(self, schema: Schema, recreate_schema: bool) -> None: """Initialize Memgraph with the given schema. + If the database already has nodes and recreate_schema is False, raises + SchemaExistsError and the script halts. + Parameters ---------- schema : Schema Schema containing graph structure definitions - clean_start : bool - If True, delete all existing data before initialization + recreate_schema : bool + If True, delete all existing data before initialization. + If False and database has nodes, raises SchemaExistsError. """ assert self.conn is not None, "Connection is closed" self._database_name = schema.general.name logger.info(f"Initialized Memgraph with schema '{self._database_name}'") - if clean_start: + # Check if database already has nodes (schema/graph exists) + cursor = self.conn.cursor() + cursor.execute("MATCH (n) RETURN count(n) AS c") + row = cursor.fetchone() + cursor.close() + count = 0 + if row is not None: + count = ( + row[0] + if isinstance(row, (list, tuple)) + else getattr(row, "c", row.get("c", 0) if hasattr(row, "get") else 0) + ) + if count > 0 and not recreate_schema: + raise SchemaExistsError( + f"Schema/graph already exists ({count} nodes). " + "Set recreate_schema=True to replace, or use clear_data=True before ingestion." + ) + + if recreate_schema: try: self.delete_graph_structure(delete_all=True) except Exception as e: - logger.warning(f"Error clearing data on clean_start: {e}") + logger.warning(f"Error clearing data on recreate_schema: {e}") + + def clear_data(self, schema: Schema) -> None: + """Remove all data from the graph without dropping the schema. + + Deletes all nodes and relationships; labels (schema) remain. + """ + self.delete_graph_structure(delete_all=True) def upsert_docs_batch( self, diff --git a/graflo/db/neo4j/conn.py b/graflo/db/neo4j/conn.py index 8c9940b4..fc1cdf6d 100644 --- a/graflo/db/neo4j/conn.py +++ b/graflo/db/neo4j/conn.py @@ -19,7 +19,7 @@ Example: >>> conn = Neo4jConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) >>> conn.upsert_docs_batch(docs, "User", match_keys=["email"]) """ @@ -32,7 +32,7 @@ from graflo.architecture.onto import Index from graflo.architecture.schema import Schema from graflo.architecture.vertex import VertexConfig -from graflo.db.conn import Connection +from graflo.db.conn import Connection, SchemaExistsError from graflo.filter.onto import Expression from graflo.onto import AggregationType, ExpressionFlavor from graflo.onto import DBType @@ -252,16 +252,20 @@ def delete_graph_structure( q = "MATCH (n) DELETE n" self.execute(q) - def init_db(self, schema: Schema, clean_start: bool) -> None: + def init_db(self, schema: Schema, recreate_schema: bool) -> None: """Initialize Neo4j with the given schema. Checks if the database exists and creates it if it doesn't. Uses schema.general.name if database is not set in config. Note: Database creation is only supported in Neo4j Enterprise Edition. + If the database already has nodes and recreate_schema is False, raises + SchemaExistsError and the script halts. + Args: schema: Schema containing graph structure definitions - clean_start: If True, delete all existing data before initialization + recreate_schema: If True, delete all existing data before initialization. + If False and database has nodes, raises SchemaExistsError. """ # Determine database name: use config.database if set, otherwise use schema.general.name db_name = self.config.database @@ -321,13 +325,34 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: ) try: - if clean_start: + # Check if database already has nodes (schema/graph exists) + result = self.execute("MATCH (n) RETURN count(n) AS c") + count = 0 + if hasattr(result, "data"): + data = result.data() + if data: + first = data[0] + count = first.get("c", 0) or 0 + if count == 0 and hasattr(result, "__iter__"): + for record in result: + if hasattr(record, "get"): + count = record.get("c", 0) or 0 + else: + count = getattr(record, "c", 0) or 0 + break + if count > 0 and not recreate_schema: + raise SchemaExistsError( + f"Schema/graph already exists in database '{db_name}' ({count} nodes). " + "Set recreate_schema=True to replace, or use clear_data=True before ingestion." + ) + + if recreate_schema: try: self.delete_database("") logger.debug(f"Cleaned database '{db_name}' for fresh start") except Exception as clean_error: logger.warning( - f"Error during clean_start for database '{db_name}': {clean_error}", + f"Error during recreate_schema for database '{db_name}': {clean_error}", exc_info=True, ) # Continue - may be first run or already clean @@ -341,6 +366,8 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: exc_info=True, ) raise + except SchemaExistsError: + raise except Exception as e: logger.error( f"Error during database schema initialization for '{db_name}': {e}", @@ -348,6 +375,13 @@ def init_db(self, schema: Schema, clean_start: bool) -> None: ) raise + def clear_data(self, schema: Schema) -> None: + """Remove all data from the graph without dropping the schema. + + Deletes all nodes and relationships; labels (schema) remain. + """ + self.delete_graph_structure((), (), delete_all=True) + def upsert_docs_batch(self, docs, class_name, match_keys, **kwargs): """Upsert a batch of nodes using Cypher. diff --git a/graflo/db/tigergraph/conn.py b/graflo/db/tigergraph/conn.py index 6b86117f..4d502061 100644 --- a/graflo/db/tigergraph/conn.py +++ b/graflo/db/tigergraph/conn.py @@ -19,7 +19,7 @@ Example: >>> conn = TigerGraphConnection(config) - >>> conn.init_db(schema, clean_start=True) + >>> conn.init_db(schema, recreate_schema=True) >>> conn.upsert_docs_batch(docs, "User", match_keys=["email"]) """ @@ -42,7 +42,7 @@ from graflo.architecture.onto import Index from graflo.architecture.schema import Schema from graflo.architecture.vertex import FieldType, Vertex, VertexConfig -from graflo.db.conn import Connection +from graflo.db.conn import Connection, SchemaExistsError from graflo.db.connection.onto import TigergraphConfig from graflo.db.tigergraph.onto import ( TIGERGRAPH_TYPE_ALIASES, @@ -2272,15 +2272,19 @@ def _define_schema_local(self, schema: Schema) -> None: ) @_wrap_tg_exception - def init_db(self, schema: Schema, clean_start: bool = False) -> None: + def init_db(self, schema: Schema, recreate_schema: bool = False) -> None: """ Initialize database with schema definition. + If the graph already exists and recreate_schema is False, raises + SchemaExistsError and the script halts. + Follows the same pattern as ArangoDB: - 1. Clean if needed - 2. Create graph if not exists - 3. Define schema locally within the graph - 4. Define indexes + 1. Halt if graph exists and recreate_schema is False + 2. Clean (drop graph) if recreate_schema + 3. Create graph if not exists + 4. Define schema locally within the graph + 5. Define indexes If any step fails, the graph will be cleaned up gracefully. """ @@ -2299,14 +2303,20 @@ def init_db(self, schema: Schema, clean_start: bool = False) -> None: _validate_tigergraph_schema_name(graph_name, "graph") try: - if clean_start: + if self.graph_exists(graph_name) and not recreate_schema: + raise SchemaExistsError( + f"Schema/graph already exists: graph '{graph_name}'. " + "Set recreate_schema=True to replace, or use clear_data=True before ingestion." + ) + + if recreate_schema: try: # Only delete the current graph self.delete_database(graph_name) logger.debug(f"Cleaned graph '{graph_name}' for fresh start") except Exception as clean_error: logger.warning( - f"Error during clean_start for graph '{graph_name}': {clean_error}", + f"Error during recreate_schema for graph '{graph_name}': {clean_error}", exc_info=True, ) @@ -3026,6 +3036,16 @@ def delete_graph_structure(self, vertex_types=(), graph_names=(), delete_all=Fal except Exception as e: logger.error(f"Error in delete_graph_structure: {e}") + def clear_data(self, schema: Schema) -> None: + """Remove all data from the graph without dropping the schema. + + Deletes vertices (and their edges) for all vertex types in the schema. + """ + vc = schema.vertex_config + vertex_types = tuple(vc.vertex_dbname(v) for v in vc.vertex_set) + if vertex_types: + self.delete_graph_structure(vertex_types=vertex_types) + def _generate_upsert_payload( self, data: list[dict[str, Any]], vname: str, vindex: tuple[str, ...] ) -> dict[str, Any]: diff --git a/graflo/hq/caster.py b/graflo/hq/caster.py index 09b9d7d8..3d71765f 100644 --- a/graflo/hq/caster.py +++ b/graflo/hq/caster.py @@ -46,7 +46,8 @@ class IngestionParams(BaseModel): """Parameters for controlling the ingestion process. Attributes: - clean_start: Whether to clean the database before ingestion + clear_data: If True, remove all existing graph data before ingestion without + changing the schema. n_cores: Number of CPU cores/threads to use for parallel processing max_items: Maximum number of items to process per resource (applies to all data sources) batch_size: Size of batches for processing @@ -58,7 +59,7 @@ class IngestionParams(BaseModel): concurrent transactions well (e.g., Neo4j). Database-independent setting. """ - clean_start: bool = False + clear_data: bool = False n_cores: int = 1 max_items: int | None = None batch_size: int = 10000 @@ -93,7 +94,7 @@ def __init__( ingestion_params: IngestionParams instance with ingestion configuration. If None, creates IngestionParams from kwargs or uses defaults **kwargs: Additional configuration options (for backward compatibility): - - clean_start: Whether to clean the database before ingestion + - clear_data: Whether to clear existing data before ingestion - n_cores: Number of CPU cores/threads to use for parallel processing - max_items: Maximum number of items to process - batch_size: Size of batches for processing @@ -737,7 +738,7 @@ def _build_registry_from_patterns( def ingest( self, - output_config: DBConfig, + target_db_config: DBConfig, patterns: "Patterns | None" = None, ingestion_params: IngestionParams | None = None, ): @@ -750,7 +751,7 @@ def ingest( - IngestionParams: Parameters controlling the ingestion process Args: - output_config: Target database connection configuration (for writing graph) + target_db_config: Target database connection configuration (for writing graph) patterns: Patterns instance mapping resources to data sources If None, defaults to empty Patterns() ingestion_params: IngestionParams instance with ingestion configuration. @@ -761,7 +762,7 @@ def ingest( ingestion_params = ingestion_params or IngestionParams() # Initialize vertex config with correct field types based on database type - db_flavor = output_config.connection_type + db_flavor = target_db_config.connection_type self.schema.vertex_config.db_flavor = db_flavor self.schema.vertex_config.finish_init() # Initialize edge config after vertex config is fully initialized @@ -774,7 +775,7 @@ def ingest( asyncio.run( self.ingest_data_sources( data_source_registry=registry, - conn_conf=output_config, + conn_conf=target_db_config, ingestion_params=ingestion_params, ) ) diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index cc7af861..54aa0088 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -100,8 +100,8 @@ def create_patterns( def define_schema( self, schema: Schema, - output_config: DBConfig, - clean_start: bool = False, + target_db_config: DBConfig, + recreate_schema: bool = False, ) -> None: """Define schema in the target database. @@ -109,27 +109,34 @@ def define_schema( Some databases don't require explicit schema definition (e.g., Neo4j), but this method ensures the database is properly initialized. + If the schema/graph already exists and recreate_schema is False (default), + init_db raises SchemaExistsError and the script halts. + Args: schema: Schema configuration for the graph - output_config: Target database connection configuration - clean_start: Whether to clean the database before defining schema + target_db_config: Target database connection configuration + recreate_schema: If True, drop existing schema and define new one. + If False and schema/graph already exists, raises SchemaExistsError. """ # If effective_schema is not set, use schema.general.name as fallback - if output_config.can_be_target() and output_config.effective_schema is None: + if ( + target_db_config.can_be_target() + and target_db_config.effective_schema is None + ): schema_name = schema.general.name # Map to the appropriate field based on DB type - if output_config.connection_type == DBType.TIGERGRAPH: + if target_db_config.connection_type == DBType.TIGERGRAPH: # TigerGraph uses 'schema_name' field - output_config.schema_name = schema_name + target_db_config.schema_name = schema_name else: # ArangoDB, Neo4j use 'database' field (which maps to effective_schema) - output_config.database = schema_name + target_db_config.database = schema_name # Initialize database with schema definition # init_db() handles database/schema creation automatically # It checks if the database exists and creates it if needed - with ConnectionManager(connection_config=output_config) as db_client: - db_client.init_db(schema, clean_start) + with ConnectionManager(connection_config=target_db_config) as db_client: + db_client.init_db(schema, recreate_schema) def define_and_ingest( self, @@ -137,7 +144,8 @@ def define_and_ingest( target_db_config: DBConfig, patterns: "Patterns | None" = None, ingestion_params: IngestionParams | None = None, - clean_start: bool | None = None, + recreate_schema: bool | None = None, + clear_data: bool | None = None, ) -> None: """Define schema and ingest data into the graph database in one operation. @@ -151,34 +159,32 @@ def define_and_ingest( If None, defaults to empty Patterns() ingestion_params: IngestionParams instance with ingestion configuration. If None, uses default IngestionParams() - clean_start: Whether to clean the database before defining schema. - If None, uses ingestion_params.clean_start if provided, otherwise False. - Note: If clean_start is True, ingestion_params.clean_start will be - set to False to avoid double-cleaning. + recreate_schema: If True, drop existing schema and define new one. + If None, defaults to False. When False and schema already exists, + define_schema raises SchemaExistsError and the script halts. + clear_data: If True, remove existing data before ingestion (schema unchanged). + If None, uses ingestion_params.clear_data. """ ingestion_params = ingestion_params or IngestionParams() + if clear_data is None: + clear_data = ingestion_params.clear_data + if recreate_schema is None: + recreate_schema = False - # Determine clean_start value: explicit parameter > ingestion_params > False - if clean_start is None: - clean_start = ingestion_params.clean_start - - # Define schema first + # Define schema first (halts with SchemaExistsError if schema exists and recreate_schema is False) self.define_schema( schema=schema, - output_config=target_db_config, - clean_start=clean_start, + target_db_config=target_db_config, + recreate_schema=recreate_schema, ) - # If we cleaned during schema definition, don't clean again during ingestion - if clean_start: - ingestion_params = IngestionParams( - **{**ingestion_params.model_dump(), "clean_start": False} - ) - - # Then ingest data + # Then ingest data (clear_data is applied inside ingest() when ingestion_params.clear_data) + ingestion_params = IngestionParams( + **{**ingestion_params.model_dump(), "clear_data": clear_data} + ) self.ingest( schema=schema, - output_config=target_db_config, + target_db_config=target_db_config, patterns=patterns, ingestion_params=ingestion_params, ) @@ -186,24 +192,30 @@ def define_and_ingest( def ingest( self, schema: Schema, - output_config: DBConfig, + target_db_config: DBConfig, patterns: "Patterns | None" = None, ingestion_params: IngestionParams | None = None, ) -> None: """Ingest data into the graph database. + If ingestion_params.clear_data is True, removes all existing data + (without touching the schema) before ingestion. + Args: schema: Schema configuration for the graph - output_config: Target database connection configuration + target_db_config: Target database connection configuration patterns: Patterns instance mapping resources to data sources. If None, defaults to empty Patterns() ingestion_params: IngestionParams instance with ingestion configuration. If None, uses default IngestionParams() """ ingestion_params = ingestion_params or IngestionParams() + if ingestion_params.clear_data: + with ConnectionManager(connection_config=target_db_config) as db_client: + db_client.clear_data(schema) caster = Caster(schema=schema, ingestion_params=ingestion_params) caster.ingest( - output_config=output_config, + target_db_config=target_db_config, patterns=patterns or Patterns(), ingestion_params=ingestion_params, ) diff --git a/test/conftest.py b/test/conftest.py index 7b4e7ec3..45cb4072 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -79,18 +79,18 @@ def ingest_atomic(conn_conf, current_path, test_db_name, schema_o, mode, n_cores # Use GraphEngine for the full workflow engine = GraphEngine(target_db_flavor=db_type) - # Define schema first (with clean_start=True) + # Define schema first (with recreate_schema=True) engine.define_schema( schema=schema_o, - output_config=conn_conf, - clean_start=True, + target_db_config=conn_conf, + recreate_schema=True, ) # Then ingest data - ingestion_params = IngestionParams(n_cores=n_cores, clean_start=False) + ingestion_params = IngestionParams(n_cores=n_cores, clear_data=False) engine.ingest( schema=schema_o, - output_config=conn_conf, + target_db_config=conn_conf, patterns=patterns, ingestion_params=ingestion_params, ) diff --git a/test/db/arangos/test_db_index.py b/test/db/arangos/test_db_index.py index 0041fd1a..a9d851ee 100644 --- a/test/db/arangos/test_db_index.py +++ b/test/db/arangos/test_db_index.py @@ -15,7 +15,7 @@ def modes(): def init_db(m, conn_conf, schema, current_path, reset): with ConnectionManager(connection_config=conn_conf) as db_client: - db_client.init_db(schema, clean_start=True) + db_client.init_db(schema, recreate_schema=True) ixs = db_client.fetch_indexes() ixs = {k: v for k, v in ixs.items() if not k.startswith("_")} diff --git a/test/db/tigergraphs/test_db_creation.py b/test/db/tigergraphs/test_db_creation.py index a415d75c..8b466303 100644 --- a/test/db/tigergraphs/test_db_creation.py +++ b/test/db/tigergraphs/test_db_creation.py @@ -82,7 +82,7 @@ def test_schema_creation(conn_conf, test_graph_name, schema_obj): with ConnectionManager(connection_config=conn_conf) as db_client: # init_db will: create graph, define schema, define indexes # Graph name comes from schema.general.name - db_client.init_db(schema_obj, clean_start=True) + db_client.init_db(schema_obj, recreate_schema=True) with ConnectionManager(connection_config=conn_conf) as db_client: # Verify graph exists (using name from schema.general.name) @@ -120,7 +120,7 @@ def test_schema_creation_edges(conn_conf, test_graph_name, schema_obj): with ConnectionManager(connection_config=conn_conf) as db_client: # init_db will: create graph, define schema, define indexes # Graph name comes from schema.general.name - db_client.init_db(schema_obj, clean_start=True) + db_client.init_db(schema_obj, recreate_schema=True) with ConnectionManager(connection_config=conn_conf) as db_client: # Verify graph exists (using name from schema.general.name) diff --git a/test/db/tigergraphs/test_db_index.py b/test/db/tigergraphs/test_db_index.py index 727c0c37..fbcbdf24 100644 --- a/test/db/tigergraphs/test_db_index.py +++ b/test/db/tigergraphs/test_db_index.py @@ -9,7 +9,7 @@ def test_create_vertex_index(conn_conf, schema_obj, test_graph_name): with ConnectionManager(connection_config=conn_conf) as db_client: # Initialize database with schema - db_client.init_db(schema_obj, clean_start=True) + db_client.init_db(schema_obj, recreate_schema=True) # Define vertex indexes (indexes should be created in init_db, but call explicitly) db_client.define_vertex_indices(schema_obj.vertex_config) From 7f0ad31de80a1f71f53d7bb365ba5b541e288cdc Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Mon, 2 Feb 2026 00:40:58 +0100 Subject: [PATCH 4/5] improved inferencer : added row count estimates and row samples --- graflo/architecture/onto_sql.py | 17 +++ graflo/db/postgres/conn.py | 187 +++++++++++++++++++++++++++++++- graflo/hq/graph_engine.py | 23 ++++ graflo/hq/inferencer.py | 3 +- 4 files changed, 225 insertions(+), 5 deletions(-) diff --git a/graflo/architecture/onto_sql.py b/graflo/architecture/onto_sql.py index 8d537c82..6f01d43c 100644 --- a/graflo/architecture/onto_sql.py +++ b/graflo/architecture/onto_sql.py @@ -10,6 +10,10 @@ class ColumnInfo(BaseModel): is_nullable: str = "YES" column_default: str | None = None is_pk: bool = False + is_unique: bool = False + ordinal_position: int | None = None + sample_values: list[str] = [] + """Up to 5 sample values from the column (first rows); empty for vertex/edge ColumnInfo.""" class ForeignKeyInfo(BaseModel): @@ -46,9 +50,22 @@ class EdgeTableInfo(BaseModel): relation: str | None = None +class RawTableInfo(BaseModel): + """Raw table metadata: all tables with columns, types, and constraint metadata.""" + + name: str + schema_name: str + columns: list[ColumnInfo] + primary_key: list[str] + foreign_keys: list[ForeignKeyInfo] + row_count_estimate: int | None = None + """Approximate row count from pg_class.reltuples (updated by ANALYZE).""" + + class SchemaIntrospectionResult(BaseModel): """Result of PostgreSQL schema introspection.""" vertex_tables: list[VertexTableInfo] edge_tables: list[EdgeTableInfo] + raw_tables: list[RawTableInfo] schema_name: str diff --git a/graflo/db/postgres/conn.py b/graflo/db/postgres/conn.py index 5e356b83..859fab34 100644 --- a/graflo/db/postgres/conn.py +++ b/graflo/db/postgres/conn.py @@ -23,6 +23,7 @@ from typing import Any import psycopg2 +from psycopg2 import sql from psycopg2.extras import RealDictCursor from graflo.architecture.onto_sql import ( @@ -30,6 +31,7 @@ ForeignKeyInfo, VertexTableInfo, EdgeTableInfo, + RawTableInfo, SchemaIntrospectionResult, ) from graflo.db.connection.onto import PostgresConfig @@ -284,7 +286,8 @@ def _get_table_columns_pg_catalog( pg_catalog.format_type(a.atttypid, a.atttypmod) as type, CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END as is_nullable, pg_catalog.pg_get_expr(d.adbin, d.adrelid) as column_default, - COALESCE(dsc.description, '') as description + COALESCE(dsc.description, '') as description, + a.attnum as ordinal_position FROM pg_catalog.pg_attribute a JOIN pg_catalog.pg_class c ON c.oid = a.attrelid JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace @@ -365,7 +368,8 @@ def get_table_columns( c.character_maximum_length, c.is_nullable, c.column_default, - COALESCE(d.description, '') as description + COALESCE(d.description, '') as description, + c.ordinal_position as ordinal_position FROM information_schema.columns c LEFT JOIN pg_catalog.pg_statio_all_tables st ON st.schemaname = c.table_schema @@ -493,6 +497,61 @@ def get_primary_keys( # Fallback to pg_catalog return self._get_primary_keys_pg_catalog(table_name, schema_name) + def _get_unique_columns_pg_catalog( + self, table_name: str, schema_name: str + ) -> list[str]: + """Get columns in UNIQUE constraints using pg_catalog (fallback method).""" + query = """ + SELECT a.attname + FROM pg_catalog.pg_constraint con + JOIN pg_catalog.pg_class c ON c.oid = con.conrelid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + JOIN pg_catalog.pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = ANY(con.conkey) + WHERE n.nspname = %s + AND c.relname = %s + AND con.contype = 'u' + AND a.attnum > 0 + AND NOT a.attisdropped + ORDER BY array_position(con.conkey, a.attnum); + """ + with self.conn.cursor() as cursor: + cursor.execute(query, (schema_name, table_name)) + return list(dict.fromkeys(row[0] for row in cursor.fetchall())) + + def get_unique_columns( + self, table_name: str, schema_name: str | None = None + ) -> list[str]: + """Get column names that participate in any UNIQUE constraint. + + Tries information_schema first, falls back to pg_catalog if needed. + """ + if schema_name is None: + schema_name = self.config.schema_name or "public" + + try: + query = """ + SELECT kcu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE tc.constraint_type = 'UNIQUE' + AND tc.table_schema = %s + AND tc.table_name = %s + ORDER BY kcu.ordinal_position; + """ + with self.conn.cursor() as cursor: + cursor.execute(query, (schema_name, table_name)) + results = list(dict.fromkeys(row[0] for row in cursor.fetchall())) + if results and self._check_information_schema_reliable(schema_name): + return results + except Exception as e: + logger.debug( + f"information_schema query failed for unique columns: {e}, " + "falling back to pg_catalog" + ) + return self._get_unique_columns_pg_catalog(table_name, schema_name) + def _get_foreign_keys_pg_catalog( self, table_name: str, schema_name: str ) -> list[dict[str, Any]]: @@ -591,6 +650,52 @@ def get_foreign_keys( # Fallback to pg_catalog return self._get_foreign_keys_pg_catalog(table_name, schema_name) + def get_table_row_count_estimate( + self, table_name: str, schema_name: str | None = None + ) -> int | None: + """Return approximate row count from pg_class.reltuples (updated by ANALYZE). + + Avoids full table scan; may be stale until next ANALYZE. + """ + if schema_name is None: + schema_name = self.config.schema_name or "public" + query = """ + SELECT c.reltuples::bigint AS estimate + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = %s AND c.relname = %s AND c.relkind = 'r'; + """ + with self.conn.cursor() as cursor: + cursor.execute(query, (schema_name, table_name)) + row = cursor.fetchone() + if row is None: + return None + val = row[0] + return int(val) if val is not None else None + + def get_table_sample_rows( + self, + table_name: str, + schema_name: str | None = None, + limit: int = 5, + ) -> list[dict[str, Any]]: + """Return first `limit` rows from the table (no ORDER BY for speed).""" + if schema_name is None: + schema_name = self.config.schema_name or "public" + query = sql.SQL("SELECT * FROM {}.{} LIMIT %s").format( + sql.Identifier(schema_name), + sql.Identifier(table_name), + ) + try: + with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute(query, (limit,)) + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.debug( + f"Could not fetch sample rows for '{schema_name}.{table_name}': {e}" + ) + return [] + def _is_edge_like_table( self, table_name: str, pk_columns: list[str], fk_columns: list[dict[str, Any]] ) -> bool: @@ -683,8 +788,10 @@ def detect_vertex_tables( # If table has descriptive columns, consider it vertex-like if descriptive_columns: - # Mark primary key columns and convert to ColumnInfo + # Mark primary key and unique columns and convert to ColumnInfo pk_set = set(pk_columns) + unique_columns = self.get_unique_columns(table_name, schema_name) + unique_set = set(unique_columns) column_infos = [] for col in all_columns: column_infos.append( @@ -695,6 +802,8 @@ def detect_vertex_tables( is_nullable=col.get("is_nullable", "YES"), column_default=col.get("column_default"), is_pk=col["name"] in pk_set, + is_unique=col["name"] in unique_set, + ordinal_position=col.get("ordinal_position"), ) ) @@ -770,8 +879,10 @@ def detect_edge_tables( all_columns = self.get_table_columns(table_name, schema_name) - # Mark primary key columns and convert to ColumnInfo + # Mark primary key and unique columns and convert to ColumnInfo pk_set = set(pk_columns) + unique_columns = self.get_unique_columns(table_name, schema_name) + unique_set = set(unique_columns) column_infos = [] for col in all_columns: column_infos.append( @@ -782,6 +893,8 @@ def detect_edge_tables( is_nullable=col.get("is_nullable", "YES"), column_default=col.get("column_default"), is_pk=col["name"] in pk_set, + is_unique=col["name"] in unique_set, + ordinal_position=col.get("ordinal_position"), ) ) @@ -915,6 +1028,70 @@ def detect_edge_tables( return edge_tables + def _build_raw_tables(self, schema_name: str) -> list[RawTableInfo]: + """Build raw table metadata for all tables in the schema.""" + tables = self.get_tables(schema_name) + raw_tables = [] + for table_info in tables: + table_name = table_info["table_name"] + pk_columns = self.get_primary_keys(table_name, schema_name) + fk_columns = self.get_foreign_keys(table_name, schema_name) + unique_columns = self.get_unique_columns(table_name, schema_name) + all_columns = self.get_table_columns(table_name, schema_name) + row_count_estimate = self.get_table_row_count_estimate( + table_name, schema_name + ) + sample_rows = self.get_table_sample_rows(table_name, schema_name, limit=5) + + pk_set = set(pk_columns) + unique_set = set(unique_columns) + # Per-column sample values: list of values from first 5 rows (stringified) + column_names = [c["name"] for c in all_columns] + sample_by_col: dict[str, list[str]] = {c: [] for c in column_names} + for row in sample_rows: + for col_name in column_names: + if col_name in row and len(sample_by_col[col_name]) < 5: + v = row[col_name] + sample_by_col[col_name].append("NULL" if v is None else str(v)) + + column_infos = [] + for col in all_columns: + column_infos.append( + ColumnInfo( + name=col["name"], + type=col["type"], + description=col.get("description", ""), + is_nullable=col.get("is_nullable", "YES"), + column_default=col.get("column_default"), + is_pk=col["name"] in pk_set, + is_unique=col["name"] in unique_set, + ordinal_position=col.get("ordinal_position"), + sample_values=sample_by_col.get(col["name"], [])[:5], + ) + ) + + fk_infos = [ + ForeignKeyInfo( + column=fk["column"], + references_table=fk["references_table"], + references_column=fk.get("references_column"), + constraint_name=fk.get("constraint_name"), + ) + for fk in fk_columns + ] + + raw_tables.append( + RawTableInfo( + name=table_name, + schema_name=schema_name, + columns=column_infos, + primary_key=pk_columns, + foreign_keys=fk_infos, + row_count_estimate=row_count_estimate, + ) + ) + return raw_tables + def introspect_schema( self, schema_name: str | None = None ) -> SchemaIntrospectionResult: @@ -936,10 +1113,12 @@ def introspect_schema( vertex_tables = self.detect_vertex_tables(schema_name) edge_tables = self.detect_edge_tables(schema_name) + raw_tables = self._build_raw_tables(schema_name) result = SchemaIntrospectionResult( vertex_tables=vertex_tables, edge_tables=edge_tables, + raw_tables=raw_tables, schema_name=schema_name, ) diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index 54aa0088..c28b3c9d 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -9,6 +9,7 @@ from graflo import Schema from graflo.onto import DBType +from graflo.architecture.onto_sql import SchemaIntrospectionResult from graflo.db import ConnectionManager, PostgresConnection from graflo.db.connection.onto import DBConfig, PostgresConfig from graflo.hq.caster import Caster, IngestionParams @@ -48,6 +49,28 @@ def __init__( self.target_db_flavor = target_db_flavor self.resource_mapper = ResourceMapper() + def introspect( + self, + postgres_config: PostgresConfig, + schema_name: str | None = None, + ) -> SchemaIntrospectionResult: + """Introspect PostgreSQL schema and return a serializable result. + + Args: + postgres_config: PostgresConfig instance + schema_name: Schema name to introspect (defaults to config schema_name or 'public') + + Returns: + SchemaIntrospectionResult: Introspection result (vertex_tables, edge_tables, + raw_tables, schema_name) suitable for serialization. + """ + with PostgresConnection(postgres_config) as postgres_conn: + inferencer = InferenceManager( + conn=postgres_conn, + target_db_flavor=self.target_db_flavor, + ) + return inferencer.introspect(schema_name=schema_name) + def infer_schema( self, postgres_config: PostgresConfig, diff --git a/graflo/hq/inferencer.py b/graflo/hq/inferencer.py index 7f5be223..6b2914ef 100644 --- a/graflo/hq/inferencer.py +++ b/graflo/hq/inferencer.py @@ -5,6 +5,7 @@ from graflo.db.postgres import PostgresSchemaInferencer, PostgresResourceMapper from graflo.hq.sanitizer import SchemaSanitizer import logging +from graflo.architecture.onto_sql import SchemaIntrospectionResult logger = logging.getLogger(__name__) @@ -33,7 +34,7 @@ def __init__( ) self.mapper = PostgresResourceMapper(fuzzy_threshold=fuzzy_threshold) - def introspect(self, schema_name: str | None = None): + def introspect(self, schema_name: str | None = None) -> SchemaIntrospectionResult: """Introspect PostgreSQL schema. Args: From 46019d3b5e79f37071acaf0965e47d8d6a584049 Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Mon, 2 Feb 2026 00:45:04 +0100 Subject: [PATCH 5/5] version bump --- CHANGELOG.md | 10 ++++++++++ pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3716805c..1164c6aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.5] - 2026-02-02 + +### Added +- **Inferencer**: Row count estimates and row samples +- **Discard disconnected vertices**: Option to discard disconnected vertices during graph operations + +### Changed +- **clean_start**: Refactored into `recreate_schema` and `clear_data` for clearer separation of schema and data reset +- **output_config**: Renamed to `target_db_config` + ## [1.4.3] - 2026-01-25 ### Added diff --git a/pyproject.toml b/pyproject.toml index d2d4977d..4da33424 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ description = "A framework for transforming tabular (CSV, SQL) and hierarchical name = "graflo" readme = "README.md" requires-python = ">=3.11" -version = "1.4.4" +version = "1.4.5" [project.optional-dependencies] plot = [ diff --git a/uv.lock b/uv.lock index afeb73d9..21ec64c2 100644 --- a/uv.lock +++ b/uv.lock @@ -348,7 +348,7 @@ wheels = [ [[package]] name = "graflo" -version = "1.4.4" +version = "1.4.5" source = { editable = "." } dependencies = [ { name = "click" },