Skip to content

Conversation

@PythonFZ
Copy link
Member

@PythonFZ PythonFZ commented Jan 16, 2026

Implements a cluster heartbeat mechanism to detect and clean up stale workers that were registered with a previous server instance.

Changes:

  • Add ClusterHeartbeat class with background TTL refresh thread
  • Add stale_cleanup module for cleaning up stale extension/filesystem workers
  • Filter stale workers in WorkerStats.fetch() and job dispatcher
  • Add lazy cleanup on schema retrieval and filesystem operations
  • Add registration_timestamp to filesystem metadata
  • Add worker_heartbeat_ttl config option (default 60s)

This handles cold restarts (all servers down for > TTL). Partial failures with rolling restarts are documented as a limitation.

Co-Authored-By: Claude Opus 4.5 noreply@anthropic.com

Summary by CodeRabbit

  • New Features

    • Automatic detection and cleanup of stale workers after server restart via a cluster heartbeat; stale workers are excluded from assignments and availability.
    • Filesystem and room endpoints perform pre-fetch cleanup and now handle disconnected workers gracefully.
    • Worker registrations include a registration timestamp to support staleness checks.
  • Configuration

    • Added worker_heartbeat_ttl setting to control cluster heartbeat TTL (default: 60s).
  • Tests

    • Added comprehensive tests covering heartbeat, cleanup, and filtering behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

Implements a cluster heartbeat mechanism to detect and clean up stale
workers that were registered with a previous server instance.

Changes:
- Add ClusterHeartbeat class with background TTL refresh thread
- Add stale_cleanup module for cleaning up stale extension/filesystem workers
- Filter stale workers in WorkerStats.fetch() and job dispatcher
- Add lazy cleanup on schema retrieval and filesystem operations
- Add registration_timestamp to filesystem metadata
- Add worker_heartbeat_ttl config option (default 60s)

This handles cold restarts (all servers down for > TTL). Partial failures
with rolling restarts are documented as a limitation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

📝 Walkthrough

Walkthrough

Adds a Redis-backed ClusterHeartbeat for detecting stale workers after server restarts, plus stale-worker cleanup utilities and integrations that filter/remove workers registered before the current heartbeat across dispatch, stats, filesystem, room endpoints, and server startup. Tests cover heartbeat and cleanup behaviors.

Changes

Cohort / File(s) Summary
Heartbeat infra
src/zndraw/app/cluster_heartbeat.py, src/zndraw/app/redis_keys.py
New ClusterHeartbeat dataclass with start/stop, background TTL refresh, and classmethods to read heartbeat and check staleness; adds ClusterKeys.HEARTBEAT.
Config
src/zndraw/config.py
Adds ZnDrawConfig.worker_heartbeat_ttl: int (default 60, min 10) controlling heartbeat TTL/refresh.
Server init
src/zndraw/server.py
Initialize/start ClusterHeartbeat in Redis-backed SocketIO mode and store in app.extensions["cluster_heartbeat"].
Stale cleanup module
src/zndraw/app/stale_cleanup.py
New module with cleanup_stale_extension_workers, _cleanup_single_extension_worker, cleanup_stale_filesystem_worker, cleanup_all_stale_workers_for_category; deletes stale worker keys, updates reverse lookups, and emits SocketIO events.
Filesystem routes
src/zndraw/app/filesystem_routes.py
Invoke cleanup_stale_filesystem_worker before filesystem endpoints; return 404 if filesystem removed due to stale worker.
Room routes
src/zndraw/app/room_routes.py
Call cleanup_all_stale_workers_for_category (room and global) prior to building room schema.
Job dispatch / worker stats
src/zndraw/app/job_dispatcher.py, src/zndraw/app/worker_stats.py
Fetch worker registration timestamps, compare to heartbeat, and filter out stale workers before capacity/availability calculations.
Worker registration
src/zndraw/app/worker_routes.py
Add registration_timestamp (UTC string) to filesystem metadata on registration.
Tests
tests/test_stale_cleanup.py
New comprehensive tests for ClusterHeartbeat lifecycle, extension/filesystem cleanup, dispatch/stats filtering, and orchestration across scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant Server
    participant ClusterHeartbeat
    participant Redis
    participant Route
    participant Cleanup
    participant SocketIO

    Server->>ClusterHeartbeat: start(ttl)
    ClusterHeartbeat->>Redis: SET cluster:heartbeat {ts}
    ClusterHeartbeat->>Redis: EXPIRE cluster:heartbeat ttl
    ClusterHeartbeat->>ClusterHeartbeat: launch refresh thread

    Note over ClusterHeartbeat: refresh loop runs every refresh_interval
    ClusterHeartbeat->>Redis: EXPIRE cluster:heartbeat ttl

    Route->>Redis: GET cluster:heartbeat
    Route->>Cleanup: cleanup_stale_*(redis, socketio, room_id, ...)
    Cleanup->>Redis: HGETALL / HGET worker registrations
    Cleanup->>ClusterHeartbeat: is_worker_stale(reg_ts)
    alt worker is stale
        Cleanup->>Redis: DEL worker keys, HDEL reverse lookups, DEL capacity/active
        Cleanup->>SocketIO: emit INVALIDATE_SCHEMA / FILESYSTEMS_UPDATE
        Cleanup->>Route: indicate removed (404 where applicable)
    else worker is valid
        Cleanup->>Route: leave mappings intact
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐇 A tiny heartbeat wakes in Redis light,
It marks the time and guards the nightly fence,
Stale workers swept away with gentle might,
New tasks dance in queues, alive and tense,
— From this rabbit, happy maintenance!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly describes the main change: implementing stale worker cleanup after server restart, which is the central feature across all modified files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/zndraw/app/cluster_heartbeat.py`:
- Around line 48-88: Protect against double-start by adding a start guard:
introduce/ use a small threading.Lock (e.g., self._start_lock) and in start()
acquire it, then check if self._thread is not None and self._thread.is_alive();
if so, refresh/return the current heartbeat timestamp (by reading
ClusterKeys.HEARTBEAT or returning previously computed timestamp) instead of
creating a new thread; otherwise proceed to set timestamp, start the new
Thread(target=self._refresh_loop, ...), store it in self._thread, and release
the lock. Ensure stop() clears/self._thread = None so subsequent start() can
create a new thread.

In `@tests/test_stale_cleanup.py`:
- Around line 24-25: Refactor the class-based tests by converting the
TestClusterHeartbeat class into standalone pytest functions: move each test
method out of the TestClusterHeartbeat class and rename them to descriptive
function names such as test_cluster_heartbeat_start_creates_new_heartbeat and
test_cluster_heartbeat_start_reuses_existing_heartbeat, ensure they accept the
same fixtures (e.g., redis_client) previously used as method parameters, remove
the class wrapper and any self references, and keep assertions and helper calls
intact so the behavior of ClusterHeartbeat tests remains identical.
🧹 Nitpick comments (6)
src/zndraw/server.py (1)

189-202: LGTM with a minor consideration.

The cluster heartbeat is correctly initialized only in multi-worker (Redis) mode. The lazy import follows project guidelines.

Note: The ClusterHeartbeat.stop() method is never called on application shutdown. Since the thread is a daemon thread, it will terminate with the process, but for graceful shutdown scenarios (e.g., tests, hot reloading), consider registering a teardown handler:

💡 Optional: Add shutdown cleanup
import atexit

# After heartbeat.start()
atexit.register(heartbeat.stop)
src/zndraw/app/job_dispatcher.py (1)

75-75: Add strict=True to zip() for safety.

Per static analysis (B905): zip() should use explicit strict= parameter. While valid_workers and capacities should always have matching lengths here, adding strict=True catches logic errors if the lists ever diverge.

🔧 Suggested fix
-    for worker_id, capacity in zip(valid_workers, capacities):
+    for worker_id, capacity in zip(valid_workers, capacities, strict=True):
src/zndraw/app/worker_stats.py (1)

64-66: Misleading comment - these workers aren't "likely stale".

At this point, valid_workers has already filtered out stale workers based on heartbeat timestamps. Workers without a capacity key here are more likely due to:

  • Race condition where worker disconnected after being added to valid_workers
  • Capacity key expired or was deleted independently
💡 Suggested comment improvement
             for capacity in capacities:
-                # Skip workers without capacity key (likely stale)
+                # Skip workers without capacity key (e.g., disconnected after filtering)
                 if capacity is None:
                     continue
src/zndraw/app/cluster_heartbeat.py (2)

104-106: Broad exception catch is acceptable here.

Static analysis flags Exception, but for a background daemon thread that must remain resilient to transient failures (Redis connection hiccups, network blips), catching broadly and logging is the right approach. Crashing would be worse than temporary refresh failures.

Consider being more specific if you want to suppress the linter warning:

💡 Optional: More specific exception handling
-            except Exception as e:
+            except (ConnectionError, TimeoutError, OSError) as e:
                 # Log warning but don't crash - Redis might have a brief hiccup
                 log.warning(f"Failed to refresh cluster heartbeat (will retry): {e}")

43-46: Consider minimum refresh interval.

If ttl_seconds is set to 1, refresh_interval would be 0, causing the refresh loop to spin without delay. While the default is 60s and this edge case is unlikely, a floor could prevent accidental tight loops.

💡 Optional safeguard
     `@property`
     def refresh_interval(self) -> int:
         """Refresh interval is half the TTL."""
-        return self.ttl_seconds // 2
+        return max(1, self.ttl_seconds // 2)
src/zndraw/app/stale_cleanup.py (1)

99-133: Consider adding a brief docstring with parameters.

While the one-line docstring is acceptable for a private helper, adding parameter descriptions would improve maintainability given the number of parameters.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 450bc3a and 29bdee3.

📒 Files selected for processing (11)
  • src/zndraw/app/cluster_heartbeat.py
  • src/zndraw/app/filesystem_routes.py
  • src/zndraw/app/job_dispatcher.py
  • src/zndraw/app/redis_keys.py
  • src/zndraw/app/room_routes.py
  • src/zndraw/app/stale_cleanup.py
  • src/zndraw/app/worker_routes.py
  • src/zndraw/app/worker_stats.py
  • src/zndraw/config.py
  • src/zndraw/server.py
  • tests/test_stale_cleanup.py
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: All default values must be defined exclusively within the Pydantic model - do not scatter fallback logic throughout the codebase
Do not perform null checks combined with hardcoded literals for default values - rely on the Pydantic schema to populate default values during initialization
You cannot use LUA scripts in Redis
If sensible, implement collections.abc interfaces for your classes, such as MutableMapping or MutableSequence
Use numpy style docstrings that are concise and to the point
Use type hints wherever possible - use list[int|float] | None instead of t.Optional[t.List[int|float]]
Imports should always be at the top of the file unless they affect startup time of ZnDraw and can be lazy loaded

Files:

  • src/zndraw/app/worker_routes.py
  • src/zndraw/app/worker_stats.py
  • tests/test_stale_cleanup.py
  • src/zndraw/app/stale_cleanup.py
  • src/zndraw/config.py
  • src/zndraw/app/redis_keys.py
  • src/zndraw/app/job_dispatcher.py
  • src/zndraw/app/filesystem_routes.py
  • src/zndraw/server.py
  • src/zndraw/app/cluster_heartbeat.py
  • src/zndraw/app/room_routes.py
**/test_*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/test_*.py: Do not use @pytest.mark.xfail or similar - all tests must pass
When designing new tests, read the old tests first to understand the existing patterns
Use pytest.mark.parametrize to avoid code duplication in tests
Tests should be very specific and test only one thing
Avoid complex test setups
Each test must be a function, not a method of a class

Files:

  • tests/test_stale_cleanup.py
🧬 Code graph analysis (8)
src/zndraw/app/worker_routes.py (1)
src/zndraw/utils/time.py (1)
  • utc_now_timestamp (33-41)
src/zndraw/app/worker_stats.py (2)
src/zndraw/app/cluster_heartbeat.py (2)
  • ClusterHeartbeat (23-158)
  • get_heartbeat_timestamp (112-127)
src/zndraw/app/redis_keys.py (1)
  • worker_capacity_key (124-133)
src/zndraw/app/stale_cleanup.py (3)
src/zndraw/app/cluster_heartbeat.py (2)
  • ClusterHeartbeat (23-158)
  • is_worker_stale (130-158)
src/zndraw/app/constants.py (1)
  • SocketEvents (4-32)
src/zndraw/app/redis_keys.py (15)
  • ExtensionKeys (12-145)
  • FilesystemKeys (149-238)
  • WorkerKeys (684-691)
  • for_global_extension (76-91)
  • for_extension (23-41)
  • global_user_extensions_key (108-121)
  • user_extensions_key (59-73)
  • active_jobs (689-691)
  • for_global_filesystem (181-198)
  • for_filesystem (159-178)
  • metadata (487-495)
  • global_user_filesystems_key (222-238)
  • user_filesystems_key (201-219)
  • schema_key (44-56)
  • global_schema_key (94-105)
src/zndraw/app/job_dispatcher.py (2)
src/zndraw/app/cluster_heartbeat.py (2)
  • ClusterHeartbeat (23-158)
  • get_heartbeat_timestamp (112-127)
src/zndraw/app/redis_keys.py (2)
  • ExtensionKeys (12-145)
  • worker_capacity_key (124-133)
src/zndraw/app/filesystem_routes.py (1)
src/zndraw/app/stale_cleanup.py (1)
  • cleanup_stale_filesystem_worker (136-213)
src/zndraw/server.py (1)
src/zndraw/app/cluster_heartbeat.py (2)
  • ClusterHeartbeat (23-158)
  • start (48-88)
src/zndraw/app/cluster_heartbeat.py (1)
src/zndraw/app/redis_keys.py (1)
  • ClusterKeys (740-747)
src/zndraw/app/room_routes.py (1)
src/zndraw/app/stale_cleanup.py (1)
  • cleanup_all_stale_workers_for_category (216-256)
🪛 Ruff (0.14.11)
src/zndraw/app/job_dispatcher.py

75-75: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)

src/zndraw/app/cluster_heartbeat.py

104-104: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: pytest (3.11, ubuntu-latest)
  • GitHub Check: pytest (3.13, ubuntu-latest)
  • GitHub Check: pytest (3.12, ubuntu-latest)
🔇 Additional comments (17)
src/zndraw/config.py (1)

161-167: LGTM!

The new worker_heartbeat_ttl field follows the established Pydantic pattern, with the default value properly defined in the model. The ge=10 constraint sensibly prevents excessively short TTLs that could cause stability issues.

Consider adding this field to log_config() (around line 233) for operational visibility during debugging.

src/zndraw/app/redis_keys.py (1)

740-747: LGTM!

The ClusterKeys class follows the same pattern as GlobalIndexKeys for cluster-wide constants. The key naming convention (cluster:heartbeat) is consistent with the existing key structure.

src/zndraw/app/worker_routes.py (1)

453-453: LGTM!

The registration_timestamp field enables stale worker detection for filesystem workers. The string conversion is appropriate for Redis hash storage, and ClusterHeartbeat.is_worker_stale() correctly handles both float | str types.

src/zndraw/app/job_dispatcher.py (2)

40-43: LGTM!

The lazy import pattern and heartbeat timestamp retrieval are correct. Fetching the heartbeat once at the start avoids multiple Redis calls during worker iteration.


51-63: LGTM!

The stale worker filtering logic correctly identifies and skips workers registered before the current cluster heartbeat. The early return optimization avoids unnecessary pipeline operations when all workers are stale.

src/zndraw/app/room_routes.py (1)

701-706: LGTM! Lazy cleanup on schema retrieval is a good pattern.

The cleanup is correctly invoked for both room-scoped and global workers before fetching schemas, ensuring stale workers are removed before returning data to clients.

src/zndraw/app/worker_stats.py (1)

44-50: LGTM! Stale worker filtering logic is correct.

Workers registered before the heartbeat timestamp are correctly excluded, and the fallback when heartbeat_ts is None is safe (all workers pass through when staleness cannot be determined).

src/zndraw/app/filesystem_routes.py (3)

142-149: LGTM! Correct lazy cleanup pattern for filesystem workers.

The cleanup check after confirming worker_id exists ensures we only attempt cleanup when a worker record is present. Returning 404 with "(worker disconnected)" provides useful context to clients about why the filesystem is unavailable.


261-269: LGTM! Consistent cleanup pattern.


423-431: LGTM! Consistent with other filesystem operations.

tests/test_stale_cleanup.py (2)

101-111: Test could be flaky due to race condition.

The assertion heartbeat._thread is None or not heartbeat._thread.is_alive() handles both cases, but there's a potential race where the thread reference is cleared after check but before is_alive(). Consider adding a small delay or using a more deterministic check.

However, this is a minor concern since the stop() method uses join(timeout=5) which should ensure the thread terminates.


473-499: LGTM! Good coverage of job dispatcher filtering.

Tests verify that get_available_workers correctly excludes stale workers and returns only valid workers with available capacity.

src/zndraw/app/cluster_heartbeat.py (1)

129-158: LGTM! Staleness check logic is correct.

The method correctly:

  • Returns False when no heartbeat exists (can't determine staleness)
  • Accepts both float and str timestamps for flexibility
  • Uses strict < comparison so workers registered at exact heartbeat moment are not considered stale
src/zndraw/app/stale_cleanup.py (4)

1-15: LGTM!

Module docstring is clear and concise. Imports are properly organized at the top of the file, and the logger is correctly initialized.


18-96: LGTM!

The function logic is sound. It correctly:

  • Handles both global and room-scoped extensions
  • Uses ClusterHeartbeat.is_worker_stale for staleness detection
  • Only deletes the extension schema when no workers remain AND no pending jobs exist
  • Properly scopes the INVALIDATE_SCHEMA event emission

The numpy-style docstring and modern type hints (str | None, list[str]) follow the coding guidelines.


136-213: LGTM!

The function correctly handles:

  • Global vs room-scoped filesystem cleanup
  • Legacy filesystems without registration_timestamp (graceful skip with debug log)
  • Complete cleanup of metadata, worker key, and reverse lookup
  • Properly scoped FILESYSTEMS_UPDATE event emission

Good defensive coding for backward compatibility.


216-256: LGTM!

Clean orchestration function that correctly iterates over all extensions in a category and delegates cleanup to cleanup_stale_extension_workers. The return type accurately reflects the collected results.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@codecov-commenter
Copy link

codecov-commenter commented Jan 16, 2026

Codecov Report

❌ Patch coverage is 94.52632% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.22%. Comparing base (9617d98) to head (8a5cdf4).
⚠️ Report is 1 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/zndraw/app/filesystem_routes.py 0.00% 16 Missing ⚠️
src/zndraw/app/stale_cleanup.py 95.06% 4 Missing ⚠️
src/zndraw/app/room_routes.py 0.00% 3 Missing ⚠️
src/zndraw/app/cluster_heartbeat.py 96.61% 2 Missing ⚠️
src/zndraw/app/worker_stats.py 92.30% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #860      +/-   ##
==========================================
+ Coverage   80.04%   80.22%   +0.17%     
==========================================
  Files         165      170       +5     
  Lines       20131    20718     +587     
==========================================
+ Hits        16114    16621     +507     
- Misses       4017     4097      +80     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- Add double-start protection to ClusterHeartbeat using threading.Lock
- Add minimum refresh interval (5s) to prevent tight loops
- Add strict=True to zip() in job_dispatcher for safety
- Fix misleading comment in worker_stats.py
- Add detailed docstring to _cleanup_single_extension_worker
- Refactor all test classes to standalone pytest functions
- Add tests for double-start, restart, and minimum interval

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/zndraw/app/job_dispatcher.py`:
- Around line 53-59: The loop that compares worker registration timestamps to
heartbeat_ts in job_dispatcher.py can raise on malformed reg_ts when calling
float(reg_ts); wrap the conversion of reg_ts to float in a try/except (handling
ValueError/TypeError), log a warning identifying worker_id and the bad reg_ts,
and skip that worker (continue) so invalid/malformed Redis values do not abort
worker selection; update the block around the for loop where worker_id, reg_ts
are used to perform the guarded conversion and subsequent comparison with
heartbeat_ts.

In `@src/zndraw/app/worker_stats.py`:
- Around line 45-50: The loop that builds valid_workers calls float(reg_ts)
without guarding against malformed timestamps; modify the loop (in the code
handling all_workers and heartbeat_ts—the block that appends to valid_workers)
to wrap the float(reg_ts) conversion in a try/except (catch
ValueError/TypeError), skip that worker_id on failure, and optionally emit a
debug/warn log mentioning the worker_id and bad reg_ts; apply the same defensive
change to the equivalent timestamp conversion in job_dispatcher.py (the similar
block around line 54).
🧹 Nitpick comments (6)
src/zndraw/app/cluster_heartbeat.py (3)

41-42: Avoid duplicating the default TTL outside the config model.

ttl_seconds defines a local default (60). If the Pydantic config already owns this default, consider making the field required here (and update the docstring) to keep a single source of truth. As per coding guidelines, defaults should live in the Pydantic model.

♻️ Proposed refactor
-    ttl_seconds: int = 60
+    ttl_seconds: int

67-75: Synchronize stop() with start() to avoid lifecycle races.

stop() doesn’t take _start_lock, so a concurrent start() can clear _stop_event after stop() sets it and still spawn a thread. Holding the same lock keeps the lifecycle consistent.

🧵 Proposed fix
     def stop(self) -> None:
         """Stop the heartbeat refresh thread."""
-        self._stop_event.set()
-        if self._thread:
-            self._thread.join(timeout=5)
-            self._thread = None
+        with self._start_lock:
+            self._stop_event.set()
+            if self._thread:
+                self._thread.join(timeout=5)
+                self._thread = None

Also applies to: 108-113


118-124: Catch Redis-specific errors in the refresh loop.

Catching Exception can mask programming errors; restrict to redis.exceptions.RedisError (or the appropriate base for redis‑py 6.4.0) and handle unexpected errors separately.

🐛 Proposed refinement
-import logging
-import threading
-import time
-import typing as t
+import logging
+import threading
+import time
+import typing as t
+import redis
@@
-            except Exception as e:
+            except redis.exceptions.RedisError as e:
                 # Log warning but don't crash - Redis might have a brief hiccup
                 log.warning(f"Failed to refresh cluster heartbeat (will retry): {e}")
src/zndraw/app/stale_cleanup.py (1)

50-58: Cache the heartbeat timestamp once per cleanup.

ClusterHeartbeat.is_worker_stale() performs a Redis GET per call; in this loop that’s one GET per worker. Fetch the heartbeat once and compare locally to cut round‑trips.

♻️ Proposed refactor
-    stale_workers = []
-    for worker_id, reg_timestamp in workers.items():
-        if ClusterHeartbeat.is_worker_stale(redis_client, reg_timestamp):
-            stale_workers.append(worker_id)
+    heartbeat_ts = ClusterHeartbeat.get_heartbeat_timestamp(redis_client)
+    if heartbeat_ts is None:
+        return []
+
+    stale_workers = []
+    for worker_id, reg_timestamp in workers.items():
+        if float(reg_timestamp) < heartbeat_ts:
+            stale_workers.append(worker_id)
src/zndraw/app/worker_stats.py (1)

36-36: Move ClusterHeartbeat import to module scope.

The import is inexpensive and avoids per-call locals; if there’s no circular dependency, prefer a top-level import. As per coding guidelines, imports should live at the top unless intentionally lazy.

♻️ Proposed refactor
 from dataclasses import dataclass
 from typing import Any
 
+from .cluster_heartbeat import ClusterHeartbeat
 from .redis_keys import ExtensionKeys
@@
-        from .cluster_heartbeat import ClusterHeartbeat
src/zndraw/app/job_dispatcher.py (1)

40-40: Move ClusterHeartbeat import to module scope.

The import is light and cached; prefer a top‑level import if there’s no circular dependency. As per coding guidelines, imports should live at the top unless intentionally lazy.

♻️ Proposed refactor
 import logging
 
 from zndraw.utils.time import utc_now_timestamp
 
+from .cluster_heartbeat import ClusterHeartbeat
 from .constants import SocketEvents
 from .job_manager import JobManager
 from .redis_keys import ExtensionKeys
@@
-    from .cluster_heartbeat import ClusterHeartbeat
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 29bdee3 and 8a5cdf4.

📒 Files selected for processing (5)
  • src/zndraw/app/cluster_heartbeat.py
  • src/zndraw/app/job_dispatcher.py
  • src/zndraw/app/stale_cleanup.py
  • src/zndraw/app/worker_stats.py
  • tests/test_stale_cleanup.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_stale_cleanup.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: All default values must be defined exclusively within the Pydantic model - do not scatter fallback logic throughout the codebase
Do not perform null checks combined with hardcoded literals for default values - rely on the Pydantic schema to populate default values during initialization
You cannot use LUA scripts in Redis
If sensible, implement collections.abc interfaces for your classes, such as MutableMapping or MutableSequence
Use numpy style docstrings that are concise and to the point
Use type hints wherever possible - use list[int|float] | None instead of t.Optional[t.List[int|float]]
Imports should always be at the top of the file unless they affect startup time of ZnDraw and can be lazy loaded

Files:

  • src/zndraw/app/worker_stats.py
  • src/zndraw/app/cluster_heartbeat.py
  • src/zndraw/app/job_dispatcher.py
  • src/zndraw/app/stale_cleanup.py
🧬 Code graph analysis (2)
src/zndraw/app/worker_stats.py (2)
src/zndraw/app/cluster_heartbeat.py (2)
  • ClusterHeartbeat (26-176)
  • get_heartbeat_timestamp (130-145)
src/zndraw/app/redis_keys.py (1)
  • worker_capacity_key (124-133)
src/zndraw/app/cluster_heartbeat.py (1)
src/zndraw/app/redis_keys.py (1)
  • ClusterKeys (740-747)
🪛 Ruff (0.14.11)
src/zndraw/app/cluster_heartbeat.py

122-122: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: pytest (3.12, ubuntu-latest)
  • GitHub Check: pytest (3.11, ubuntu-latest)
  • GitHub Check: pytest (3.13, ubuntu-latest)
🔇 Additional comments (4)
src/zndraw/app/stale_cleanup.py (3)

127-147: Cleanup sequence looks complete.

Removes worker registry entries, reverse lookups, capacity keys, and active jobs as expected.


259-273: LGTM: category‑level cleanup orchestration.


193-200: No action needed — Redis client is guaranteed to have decode_responses=True.

The redis_client passed to cleanup_stale_filesystem_worker is sourced from app.extensions["redis"] (src/zndraw/server.py:59), which is initialized with decode_responses=True (src/zndraw/server.py:39). The hgetall call at line 189 will return string keys, not bytes, so the .get("registration_timestamp") lookup on line 194 works as expected. No bytes key fallback is required.

Likely an incorrect or invalid review comment.

src/zndraw/app/job_dispatcher.py (1)

75-76: No action required. The project's minimum Python version is 3.11 (specified in pyproject.toml as requires-python = ">=3.11"), which fully supports zip(..., strict=True) since that feature was introduced in Python 3.10.

Likely an incorrect or invalid review comment.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +53 to +59
for worker_id, reg_ts in all_workers.items():
if heartbeat_ts is not None and float(reg_ts) < heartbeat_ts:
# Worker is stale - skip it
log.debug(
f"Skipping stale worker {worker_id} (registered before heartbeat)"
)
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's read the target file around lines 53-59
wc -l src/zndraw/app/job_dispatcher.py

Repository: zincware/ZnDraw

Length of output: 95


🏁 Script executed:

# Read the context around the specified lines
sed -n '40,75p' src/zndraw/app/job_dispatcher.py

Repository: zincware/ZnDraw

Length of output: 1258


🏁 Script executed:

# Get a broader understanding of the file structure
head -100 src/zndraw/app/job_dispatcher.py

Repository: zincware/ZnDraw

Length of output: 2933


🏁 Script executed:

# Search for where worker registration timestamps are written/set
rg "workers\[" src/zndraw --type py -A 3 -B 3

Repository: zincware/ZnDraw

Length of output: 503


🏁 Script executed:

# Search for worker registration logic
rg "register.*worker|worker.*register" src/zndraw --type py -i -l

Repository: zincware/ZnDraw

Length of output: 515


🏁 Script executed:

# Search for hset operations on workers key
rg "hset.*worker|workers" src/zndraw --type py -A 2 -B 2

Repository: zincware/ZnDraw

Length of output: 33070


🏁 Script executed:

# Find the utc_now_timestamp function
rg "def utc_now_timestamp" src/zndraw --type py -A 5

Repository: zincware/ZnDraw

Length of output: 324


🏁 Script executed:

# Check the utils/time module
cat src/zndraw/utils/time.py

Repository: zincware/ZnDraw

Length of output: 895


🏁 Script executed:

# Check the ClusterHeartbeat.is_worker_stale method
rg "def is_worker_stale" src/zndraw --type py -A 10

Repository: zincware/ZnDraw

Length of output: 886


🏁 Script executed:

# Also check if there are any other places where float(reg_ts) is called
rg "float\(.*ts\)" src/zndraw --type py

Repository: zincware/ZnDraw

Length of output: 383


🏁 Script executed:

# Get the full is_worker_stale method
sed -n '/def is_worker_stale/,/^    def /p' src/zndraw/app/cluster_heartbeat.py | head -30

Repository: zincware/ZnDraw

Length of output: 1136


🏁 Script executed:

# Also check what happens in stale_cleanup.py with is_worker_stale
rg "is_worker_stale" src/zndraw --type py -B 2 -A 2

Repository: zincware/ZnDraw

Length of output: 1010


🏁 Script executed:

# Check worker_stats.py to see if it has the same pattern
sed -n '/for worker_id, reg_ts in all_workers/,/continue/p' src/zndraw/app/worker_stats.py

Repository: zincware/ZnDraw

Length of output: 255


🏁 Script executed:

# Check if there are any tests that provide insight into this
fd "test.*worker" src/zndraw --type f | head -5

Repository: zincware/ZnDraw

Length of output: 41


🏁 Script executed:

# Look for any validation or schema checks for worker registration
rg "schema.*worker|worker.*schema" src/zndraw --type py -i -l | head -5

Repository: zincware/ZnDraw

Length of output: 145


🏁 Script executed:

# Check if there's any data migration or cleanup for legacy timestamp formats
rg "legacy|malformed|invalid.*timestamp" src/zndraw --type py -i

Repository: zincware/ZnDraw

Length of output: 314


🏁 Script executed:

# Check Redis keys and data validation in worker_routes
sed -n '/def register_worker/,/^def /p' src/zndraw/app/worker_routes.py | head -50

Repository: zincware/ZnDraw

Length of output: 1123


🏁 Script executed:

# Check the job_manager.py context for invalid timestamps
rg "Invalid timestamps" src/zndraw/app/job_manager.py -B 5 -A 5

Repository: zincware/ZnDraw

Length of output: 372


🏁 Script executed:

# Check if there are any database migrations or version checks
rg "migrate|version|upgrade" src/zndraw --type py -i -l | head -5

Repository: zincware/ZnDraw

Length of output: 180


🏁 Script executed:

# Look at the full worker registration code to see if timestamps could be stored as strings
sed -n '/def register_worker/,/^@workers\./p' src/zndraw/app/worker_routes.py | tail -80

Repository: zincware/ZnDraw

Length of output: 2853


Add defensive handling for malformed registration timestamps.

Redis hgetall() returns string values. While normal timestamps convert cleanly, malformed or legacy data could cause float(reg_ts) to raise an unhandled exception, aborting worker selection. Log and skip invalid entries to maintain dispatch resilience.

🛡️ Proposed guard
-    for worker_id, reg_ts in all_workers.items():
-        if heartbeat_ts is not None and float(reg_ts) < heartbeat_ts:
+    for worker_id, reg_ts in all_workers.items():
+        try:
+            reg_ts_val = float(reg_ts)
+        except (TypeError, ValueError):
+            log.warning(
+                f"Skipping worker {worker_id} with invalid registration timestamp: {reg_ts}"
+            )
+            continue
+        if heartbeat_ts is not None and reg_ts_val < heartbeat_ts:
             # Worker is stale - skip it
             log.debug(
                 f"Skipping stale worker {worker_id} (registered before heartbeat)"
             )
🤖 Prompt for AI Agents
In `@src/zndraw/app/job_dispatcher.py` around lines 53 - 59, The loop that
compares worker registration timestamps to heartbeat_ts in job_dispatcher.py can
raise on malformed reg_ts when calling float(reg_ts); wrap the conversion of
reg_ts to float in a try/except (handling ValueError/TypeError), log a warning
identifying worker_id and the bad reg_ts, and skip that worker (continue) so
invalid/malformed Redis values do not abort worker selection; update the block
around the for loop where worker_id, reg_ts are used to perform the guarded
conversion and subsequent comparison with heartbeat_ts.

Comment on lines +45 to +50
valid_workers = []
for worker_id, reg_ts in all_workers.items():
if heartbeat_ts is not None and float(reg_ts) < heartbeat_ts:
# Worker is stale - skip it
continue
valid_workers.append(worker_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the file structure and find the specific lines
cat -n src/zndraw/app/worker_stats.py | head -70

Repository: zincware/ZnDraw

Length of output: 2817


🏁 Script executed:

# Let's also check what all_workers contains and where it comes from
rg -B 20 "for worker_id, reg_ts in all_workers" src/zndraw/app/worker_stats.py

Repository: zincware/ZnDraw

Length of output: 838


🏁 Script executed:

# Search for where all_workers is initialized or populated
rg "all_workers\s*=" src/zndraw/app/

Repository: zincware/ZnDraw

Length of output: 328


🏁 Script executed:

# Find where worker registration timestamps are set/written to Redis
rg "workers.*=" src/zndraw/app/ -A 3 | grep -E "(hset|set_worker|register)" -B 3 -A 3

Repository: zincware/ZnDraw

Length of output: 1237


🏁 Script executed:

# Check the redis_keys.py to understand the schema
cat src/zndraw/app/redis_keys.py | head -100

Repository: zincware/ZnDraw

Length of output: 3107


🏁 Script executed:

# Search for where workers are registered or their timestamps are set
rg "register.*worker|worker.*register" src/zndraw/ -i

Repository: zincware/ZnDraw

Length of output: 5665


🏁 Script executed:

# Check the register_worker function
cat src/zndraw/app/worker_routes.py | grep -n "register_worker" -A 80 | head -100

Repository: zincware/ZnDraw

Length of output: 2267


🏁 Script executed:

# Look for where the worker hash is populated (hset)
rg "hset.*worker|worker.*hset" src/zndraw/app/ -B 2 -A 2

Repository: zincware/ZnDraw

Length of output: 850


🏁 Script executed:

# Search for where registration timestamps are set
rg "time\(\)|timestamp\(\)|datetime" src/zndraw/app/worker_routes.py

Repository: zincware/ZnDraw

Length of output: 247


🏁 Script executed:

# Find where utc_now_timestamp is defined
rg "def utc_now_timestamp" src/

Repository: zincware/ZnDraw

Length of output: 117


🏁 Script executed:

# Also check imports in worker_routes to understand the source
head -30 src/zndraw/app/worker_routes.py

Repository: zincware/ZnDraw

Length of output: 861


🏁 Script executed:

# Check the implementation of utc_now_timestamp
cat src/zndraw/utils/time.py

Repository: zincware/ZnDraw

Length of output: 895


🏁 Script executed:

# Check how Redis stores the float and if there are any conversion issues
# Look for similar float conversions in the codebase
rg "float\(" src/zndraw/app/ -B 2 -A 2

Repository: zincware/ZnDraw

Length of output: 2963


🏁 Script executed:

# Check if there are any migration scripts or data format changes
rg "migration|migrate|legacy" src/zndraw/app/ -i

Repository: zincware/ZnDraw

Length of output: 164


🏁 Script executed:

# Verify if job_dispatcher.py (which also uses all_workers) has error handling
cat -n src/zndraw/app/job_dispatcher.py | grep -A 15 "all_workers = redis_client.hgetall"

Repository: zincware/ZnDraw

Length of output: 673


🏁 Script executed:

# Check cluster_heartbeat.py more carefully for how it handles conversions
cat src/zndraw/app/cluster_heartbeat.py | grep -A 5 -B 5 "is_worker_stale"

Repository: zincware/ZnDraw

Length of output: 469


🏁 Script executed:

# Check if there's any test or example data that might have malformed timestamps
rg "worker.*timestamp|timestamp.*worker" src/zndraw/ --type-list | head -20

Repository: zincware/ZnDraw

Length of output: 448


🏁 Script executed:

# Look at stale_cleanup.py to understand the legacy data issue
cat src/zndraw/app/stale_cleanup.py | grep -B 10 -A 10 "Legacy"

Repository: zincware/ZnDraw

Length of output: 721


🏁 Script executed:

# Check if is_worker_stale in cluster_heartbeat has error handling
cat src/zndraw/app/cluster_heartbeat.py | grep -A 15 "def is_worker_stale"

Repository: zincware/ZnDraw

Length of output: 655


🏁 Script executed:

# Search for any test failures or issues related to timestamp parsing
rg "float.*error|timestamp.*error|ValueError|TypeError" src/zndraw/ --type-list

Repository: zincware/ZnDraw

Length of output: 448


🏁 Script executed:

# Check if there are tests that verify the timestamp conversion logic
find src -name "*test*.py" -o -name "test_*" | head -5

Repository: zincware/ZnDraw

Length of output: 41


🏁 Script executed:

# Get the full is_worker_stale implementation
cat src/zndraw/app/cluster_heartbeat.py | grep -A 30 "def is_worker_stale"

Repository: zincware/ZnDraw

Length of output: 1136


🏁 Script executed:

# Let's check when is_worker_stale is actually called - does it have error handling at call sites?
rg "is_worker_stale" src/zndraw/app/ -B 2 -A 2

Repository: zincware/ZnDraw

Length of output: 1010


🏁 Script executed:

# Check if there's any version history or migration notes
cat CHANGELOG.md 2>/dev/null || cat HISTORY.md 2>/dev/null || echo "No changelog found"

Repository: zincware/ZnDraw

Length of output: 77


Add defensive handling for malformed worker registration timestamps.

The code calls float(reg_ts) without error handling. While current registrations use utc_now_timestamp(), defensive programming is warranted given the codebase acknowledges legacy/migrated data. Skip invalid entries to ensure stats retrieval remains resilient:

Suggested change
         for worker_id, reg_ts in all_workers.items():
+            try:
+                reg_ts_val = float(reg_ts)
+            except (TypeError, ValueError):
+                continue
-            if heartbeat_ts is not None and float(reg_ts) < heartbeat_ts:
+            if heartbeat_ts is not None and reg_ts_val < heartbeat_ts:
                 # Worker is stale - skip it
                 continue
             valid_workers.append(worker_id)

Note: The same pattern exists in job_dispatcher.py (line 54) and should be addressed there as well.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
valid_workers = []
for worker_id, reg_ts in all_workers.items():
if heartbeat_ts is not None and float(reg_ts) < heartbeat_ts:
# Worker is stale - skip it
continue
valid_workers.append(worker_id)
valid_workers = []
for worker_id, reg_ts in all_workers.items():
try:
reg_ts_val = float(reg_ts)
except (TypeError, ValueError):
continue
if heartbeat_ts is not None and reg_ts_val < heartbeat_ts:
# Worker is stale - skip it
continue
valid_workers.append(worker_id)
🤖 Prompt for AI Agents
In `@src/zndraw/app/worker_stats.py` around lines 45 - 50, The loop that builds
valid_workers calls float(reg_ts) without guarding against malformed timestamps;
modify the loop (in the code handling all_workers and heartbeat_ts—the block
that appends to valid_workers) to wrap the float(reg_ts) conversion in a
try/except (catch ValueError/TypeError), skip that worker_id on failure, and
optionally emit a debug/warn log mentioning the worker_id and bad reg_ts; apply
the same defensive change to the equivalent timestamp conversion in
job_dispatcher.py (the similar block around line 54).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants