From be88a99cfe4666159cc0d5d80464e30b03e35614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Benko?= Date: Tue, 9 Sep 2025 11:11:10 +0200 Subject: [PATCH 1/2] DB: fix access to master buffers during DP insertion Access to master buffers during datapoint insertion sometimes results in `KeyError` causing termination of worker thread. Eventually, error "Thread X is dead, more than 20 restarts attempted, giving up..." appears, worker termination procedure gets invoked, but ends up with infinite logging of "N worker threads alive". The result is silent failure of the whole worker and no task processing. --- dp3/database/database.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dp3/database/database.py b/dp3/database/database.py index 35b17302..ee6f210b 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -424,10 +424,14 @@ def insert_datapoints( with self._master_buffer_locks[etype]: if eid in self._master_buffers[etype]: for attr, push_dps in master_changes["pushes"].items(): + if "pushes" not in self._master_buffers[etype][eid]: + self._master_buffers[etype][eid]["pushes"] = {} if attr in self._master_buffers[etype][eid]["pushes"]: self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps) else: self._master_buffers[etype][eid]["pushes"][attr] = push_dps + if "$set" not in self._master_buffers[etype][eid]: + self._master_buffers[etype][eid]["$set"] = {} self._master_buffers[etype][eid]["$set"].update(master_changes["$set"]) else: self._master_buffers[etype][eid] = master_changes From 9e8ca9d455be00c92eaf20fc397a6652589e6c3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Benko?= Date: Tue, 9 Sep 2025 11:13:04 +0200 Subject: [PATCH 2/2] DB: refactor access to master buffers during DP insertion --- dp3/database/database.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dp3/database/database.py b/dp3/database/database.py index ee6f210b..d6df921c 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -426,10 +426,9 @@ def insert_datapoints( for attr, push_dps in master_changes["pushes"].items(): if "pushes" not in self._master_buffers[etype][eid]: self._master_buffers[etype][eid]["pushes"] = {} - if attr in self._master_buffers[etype][eid]["pushes"]: - self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps) - else: - self._master_buffers[etype][eid]["pushes"][attr] = push_dps + if attr not in self._master_buffers[etype][eid]["pushes"]: + self._master_buffers[etype][eid]["pushes"][attr] = [] + self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps) if "$set" not in self._master_buffers[etype][eid]: self._master_buffers[etype][eid]["$set"] = {} self._master_buffers[etype][eid]["$set"].update(master_changes["$set"])