From 4beb3ad27936176643f3651d950de58d167fe5d1 Mon Sep 17 00:00:00 2001 From: Timur Sheidaev <69433421+TSheyd@users.noreply.github.com> Date: Sat, 7 Feb 2026 02:06:35 +0900 Subject: [PATCH 1/2] fix: replace attributes on update instead of upsert --- datapipe/store/neo4j.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datapipe/store/neo4j.py b/datapipe/store/neo4j.py index d9411234..35be3b9c 100644 --- a/datapipe/store/neo4j.py +++ b/datapipe/store/neo4j.py @@ -109,9 +109,10 @@ def insert_rows(self, df: DataDF) -> None: if self._mode == "node": for node_type, gdf in df.groupby("node_type"): # group by label for single-label bulk queries rows = [ - {"id": r["node_id"], "props": r.get("attributes", {}) or {}} for r in gdf.to_dict(orient="records") + {"id": r["node_id"], "props": {"id": r["node_id"], **(r.get("attributes", {}) or {})}} + for r in gdf.to_dict(orient="records") ] - cypher = f"UNWIND $rows AS row\nMERGE (n:`{node_type}` {{id: row.id}})\nSET n += row.props" + cypher = f"UNWIND $rows AS row\nMERGE (n:`{node_type}` {{id: row.id}})\nSET n = row.props" self._run_query(cypher, {"rows": rows}) else: # unwind edges in the same fashion @@ -130,7 +131,7 @@ def insert_rows(self, df: DataDF) -> None: f"MERGE (from:`{from_type}` {{id: row.from_id}})\n" f"MERGE (to:`{to_type}` {{id: row.to_id}})\n" f"MERGE (from)-[r:`{edge_label}`]->(to)\n" - f"SET r += row.props" + f"SET r = row.props" ) self._run_query(cypher, {"rows": rows}) From d5c3c6f4cd262dc62ad0ce24367688370b962468 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 7 Feb 2026 13:38:14 +0400 Subject: [PATCH 2/2] v0.14.8a1 --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b8a478b..5178060a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# WIP 0.14.8 + +* Neo4JStore - rewrite attributes instead of upsert + # 0.14.7 * Make CLI accept multiple `--name` values diff --git a/pyproject.toml b/pyproject.toml index 91952309..ed2c74e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "datapipe-core" -version = "0.14.7" +version = "0.14.8-alpha.1" description = "`datapipe` is a realtime incremental ETL library for Python application" authors = [{ name = "Andrey Tatarinov", email = "a@tatarinov.co" }] readme = "README.md"