diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b8a478b..5178060a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# WIP 0.14.8 + +* Neo4JStore - rewrite attributes instead of upsert + # 0.14.7 * Make CLI accept multiple `--name` values diff --git a/datapipe/store/neo4j.py b/datapipe/store/neo4j.py index d9411234..35be3b9c 100644 --- a/datapipe/store/neo4j.py +++ b/datapipe/store/neo4j.py @@ -109,9 +109,10 @@ def insert_rows(self, df: DataDF) -> None: if self._mode == "node": for node_type, gdf in df.groupby("node_type"): # group by label for single-label bulk queries rows = [ - {"id": r["node_id"], "props": r.get("attributes", {}) or {}} for r in gdf.to_dict(orient="records") + {"id": r["node_id"], "props": {"id": r["node_id"], **(r.get("attributes", {}) or {})}} + for r in gdf.to_dict(orient="records") ] - cypher = f"UNWIND $rows AS row\nMERGE (n:`{node_type}` {{id: row.id}})\nSET n += row.props" + cypher = f"UNWIND $rows AS row\nMERGE (n:`{node_type}` {{id: row.id}})\nSET n = row.props" self._run_query(cypher, {"rows": rows}) else: # unwind edges in the same fashion @@ -130,7 +131,7 @@ def insert_rows(self, df: DataDF) -> None: f"MERGE (from:`{from_type}` {{id: row.from_id}})\n" f"MERGE (to:`{to_type}` {{id: row.to_id}})\n" f"MERGE (from)-[r:`{edge_label}`]->(to)\n" - f"SET r += row.props" + f"SET r = row.props" ) self._run_query(cypher, {"rows": rows}) diff --git a/pyproject.toml b/pyproject.toml index 91952309..ed2c74e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "datapipe-core" -version = "0.14.7" +version = "0.14.8-alpha.1" description = "`datapipe` is a realtime incremental ETL library for Python application" authors = [{ name = "Andrey Tatarinov", email = "a@tatarinov.co" }] readme = "README.md"