-
Notifications
You must be signed in to change notification settings - Fork 5
Fix/assigned job timeout #858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The client was storing _worker_id from server responses, but this caused a race condition during socket reconnection: 1. Socket reconnects with NEW sid 2. Server assigns pending job to NEW sid 3. Socket event handler receives job:assign 4. But _worker_id still holds OLD value (not yet updated) 5. Worker sends wrong worker_id to server → 400 BAD REQUEST The fix removes _worker_id entirely and always uses socket.sio.sid directly. This is safe because: - The server always assigns jobs to the socket's current sid - The socket that receives job:assign is always the one with that sid - socket.sio.sid always reflects the current connection Changes: - Remove _worker_id field from ZnDraw dataclass - Simplify sid property to return socket.sio.sid directly - Remove worker_id storage in socket_manager._register_extensions_after_join - Update tests to use vis.sid instead of vis._worker_id Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The register_extension() and register_filesystem() methods in api_manager.py were returning workerId from server responses, but callers no longer use this value after removing _worker_id storage. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Jobs stuck in ASSIGNED state for more than 30 seconds are now automatically failed during job listing. This handles cases where a worker disconnects before confirming the job. Changes: - Add cleanup_stale_assigned_jobs() to JobManager - Call cleanup lazily during list_active_jobs() - Add error and workerId fields to job API response - Convert Job class to dataclass with proper type hints - Add tests for timeout behavior Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds a 30-second timeout and lazy cleanup for ASSIGNED jobs during listing; converts Job to a dataclass with cached data and convenience predicates; removes returning Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant JobRoutes as Job Routes
participant JobMgr as Job Manager
participant Redis as Redis
participant SocketIO as SocketIO
Client->>JobRoutes: GET /list_jobs
JobRoutes->>JobMgr: list_active_jobs(redis, room, socketio)
JobMgr->>JobMgr: cleanup_stale_assigned_jobs(...)
JobMgr->>Redis: fetch active jobs / assigned_at
Redis-->>JobMgr: [job entries]
alt job older than 30s
JobMgr->>JobMgr: fail_job(job_id)
JobMgr->>Redis: update job status -> FAILED
JobMgr->>SocketIO: emit job_failed(room, job_id)
SocketIO-->>Client: job_failed notification
end
JobMgr-->>JobRoutes: [active jobs]
JobRoutes-->>Client: 200 OK + job list
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/zndraw/app/job_manager.py`:
- Around line 487-494: The timeout check in cleanup_stale_assigned_jobs() is
using created_at instead of the time the job was assigned; update assign_job()
to record an assigned_at timestamp (e.g., set job_data["assigned_at"] when the
job transitions to ASSIGNED and persist it and/or update jobs_by_time score),
and then modify cleanup_stale_assigned_jobs() to read and parse assigned_at
(fallback to created_at only if assigned_at missing) to compute age_seconds;
alternatively, if you prefer score-based timing, update the jobs_by_time sorted
set score at assignment and use that score for staleness checks instead of
created_at.
🧹 Nitpick comments (3)
tests/test_job_endpoints.py (1)
142-142: Consider adding timeouts torequestscalls.Static analysis flagged multiple
requests.put()andrequests.get()calls without timeout parameters. While this is less critical for tests than production code, addingtimeout=10would prevent tests from hanging indefinitely if the server becomes unresponsive.Example fix for one call
response = requests.put( f"{server}/api/rooms/test/jobs/{job.job_id}/status", json={"status": "processing", "workerId": job.worker_id}, headers=get_jwt_auth_headers(server, "testuser"), + timeout=10, )Also applies to: 163-163, 168-168, 190-190, 229-229, 287-287, 354-354, 378-378
src/zndraw/app/job_manager.py (2)
506-507: Improve exception handling.Per static analysis hints:
- Catching bare
Exceptionis too broad—consider catching specific exceptions likeValueErrororParserErrorfrom dateutil- Use
log.exception()instead oflog.error()to include the traceback♻️ Proposed fix
- except Exception as e: - log.error(f"Error checking job {job_id} for timeout: {e}") + except (ValueError, OverflowError) as e: + log.exception(f"Error checking job {job_id} for timeout: {e}")
534-556: Lazy cleanup on every list operation.The cleanup runs on every call to
list_active_jobs, iterating all active jobs twice (cleanup + listing). This is acceptable for moderate job volumes but could become a performance concern if:
- The room has many active jobs
list_active_jobsis called frequently (e.g., polling)Consider adding a timestamp check to skip cleanup if it ran recently (e.g., within the last 10 seconds), or moving to a scheduled background cleanup.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/zndraw/api_manager.pysrc/zndraw/app/job_manager.pysrc/zndraw/app/job_routes.pysrc/zndraw/job.pysrc/zndraw/socket_manager.pysrc/zndraw/zndraw.pytests/test_job_endpoints.py
💤 Files with no reviewable changes (1)
- src/zndraw/api_manager.py
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: All default values must be defined exclusively within the Pydantic model - do not scatter fallback logic throughout the codebase
Do not perform null checks combined with hardcoded literals for default values - rely on the Pydantic schema to populate default values during initialization
You cannot use LUA scripts in Redis
If sensible, implement collections.abc interfaces for your classes, such as MutableMapping or MutableSequence
Use numpy style docstrings that are concise and to the point
Use type hints wherever possible - uselist[int|float] | Noneinstead oft.Optional[t.List[int|float]]
Imports should always be at the top of the file unless they affect startup time of ZnDraw and can be lazy loaded
Files:
src/zndraw/socket_manager.pysrc/zndraw/app/job_manager.pytests/test_job_endpoints.pysrc/zndraw/app/job_routes.pysrc/zndraw/zndraw.pysrc/zndraw/job.py
**/test_*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/test_*.py: Do not use@pytest.mark.xfailor similar - all tests must pass
When designing new tests, read the old tests first to understand the existing patterns
Usepytest.mark.parametrizeto avoid code duplication in tests
Tests should be very specific and test only one thing
Avoid complex test setups
Each test must be a function, not a method of a class
Files:
tests/test_job_endpoints.py
🧬 Code graph analysis (6)
src/zndraw/socket_manager.py (3)
src/zndraw/zndraw.py (2)
register_extension(1519-1581)register_filesystem(1630-1704)src/zndraw/api_manager.py (2)
register_extension(319-375)register_filesystem(377-429)src/zndraw/app/worker_routes.py (1)
register_filesystem(303-487)
src/zndraw/app/job_manager.py (2)
src/zndraw/app/redis_keys.py (5)
RoomKeys(242-550)jobs_active(424-426)JobKeys(673-680)hash_key(570-572)hash_key(678-680)src/zndraw/utils/time.py (1)
utc_now_timestamp(33-41)
tests/test_job_endpoints.py (1)
src/zndraw/job.py (2)
worker_id(107-115)is_assigned(75-77)
src/zndraw/app/job_routes.py (1)
src/zndraw/app/job_manager.py (2)
JobManager(134-707)list_all_jobs(619-640)
src/zndraw/zndraw.py (3)
app/src/socket.ts (1)
socket(7-15)src/zndraw/api_manager.py (2)
register_extension(319-375)register_filesystem(377-429)src/zndraw/app/worker_routes.py (1)
register_filesystem(303-487)
src/zndraw/job.py (2)
src/zndraw/app/job_manager.py (1)
JobStatus(90-97)app/src/types/jobs.ts (1)
JobStatus(4-9)
🪛 Ruff (0.14.11)
src/zndraw/app/job_manager.py
506-506: Do not catch blind exception: Exception
(BLE001)
507-507: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
tests/test_job_endpoints.py
142-142: Probable use of requests call without timeout
(S113)
163-163: Probable use of requests call without timeout
(S113)
168-168: Probable use of requests call without timeout
(S113)
190-190: Probable use of requests call without timeout
(S113)
229-229: Probable use of requests call without timeout
(S113)
287-287: Probable use of requests call without timeout
(S113)
354-354: Probable use of requests call without timeout
(S113)
378-378: Probable use of requests call without timeout
(S113)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: pytest (3.13, ubuntu-latest)
- GitHub Check: pytest (3.11, ubuntu-latest)
- GitHub Check: pytest (3.12, ubuntu-latest)
🔇 Additional comments (17)
src/zndraw/job.py (5)
36-41: LGTM on dataclass conversion.The dataclass pattern with
field(default_factory=dict, repr=False)for internal cache is clean. TheAnytypes forapiandsocketare acceptable given they're implementation details.
54-58: Lazy caching pattern looks good.The
_ensure_cached()helper correctly handles the lazy loading scenario. However, note that an empty dict{}is falsy, so if the API ever returns an empty dict, it would trigger unnecessary refetches. This is unlikely in practice since job data always has at least anidfield.
95-115: New properties align with API response structure.The
errorandworker_idproperties correctly useor Noneto convert empty strings toNone, andworker_iduses"workerId"(camelCase) matching the API response format fromjob_routes.py.
147-149: Simplified__repr__is terminal-friendly.The simplified representation provides essential information without cluttering output.
71-93: Status predicates correctly useJobStatusenum for comparison.The implementation is sound.
self.statusreturns a string from cached data, and sinceJobStatus(str, Enum)makes enum members string instances, comparisons likeself.status == JobStatus.PENDINGwork as intended. All referenced enum values (PENDING,ASSIGNED,PROCESSING,COMPLETED,FAILED) are defined and match the expected status field values.src/zndraw/socket_manager.py (1)
157-185: LGTM on removing worker_id capture.The removal of worker_id capture from registration calls aligns with the PR's shift to using socket session ID (
sid) for worker identification. The server still returnsworkerIdin responses (as seen inworker_routes.py), but the client no longer needs to store it locally sinceself.zndraw.sid(fromsocket.sio.sid) is now the canonical identifier.src/zndraw/app/job_routes.py (2)
121-129: LGTM on lazy cleanup integration.The endpoint now correctly passes
socketiotoJobManager.list_all_jobs()to enable lazy cleanup of stale ASSIGNED jobs. The docstring update mentioning lazy cleanup is helpful.
197-198: LGTM on extended response fields.Adding
errorandworkerIdto the response enables the client-sideJobobject's new properties. Theor Nonepattern correctly converts empty strings toNonefor cleaner API responses.src/zndraw/zndraw.py (3)
685-697: LGTM on simplifiedsidproperty.The property now directly returns
self.socket.sio.sid, which is the actual socket session ID. This is cleaner than storing a separate_worker_idthat the server returned. The docstring correctly explains this is the canonical identifier for job assignment and worker tracking.
1574-1581: LGTM on register_extension update.The method no longer captures a return value from
api.register_extension()since the server now manages worker-to-session mapping internally. Logging withself.sidprovides the correct identifier for debugging.
1695-1704: LGTM on register_filesystem update.Consistent with the extension registration changes - no longer captures worker_id, logs with
self.sid.tests/test_job_endpoints.py (4)
140-146: Correct use ofjob.worker_idfor status updates.Using
job.worker_id(the worker assigned to execute the job) instead ofvis.sid(the client's session) is semantically correct. The job's assigned worker should be the one reporting status changes.
159-171: LGTM on capturing worker_id before status transitions.Storing
worker_id = job.worker_idearly ensures consistency across multiple status update requests for the same job.
332-364: Well-designed test for stale ASSIGNED job cleanup.The test correctly:
- Creates a job in ASSIGNED state
- Artificially ages it via Redis manipulation
- Triggers cleanup via list endpoint
- Verifies the job transitions to FAILED with appropriate error message
Good coverage of the new timeout mechanism.
367-386: Good edge case coverage for non-stale jobs.This test ensures that fresh ASSIGNED jobs are not incorrectly failed by the cleanup mechanism - important to prevent false positives.
src/zndraw/app/job_manager.py (2)
18-20: LGTM!The timeout constant is well-documented and the 30-second threshold is reasonable for detecting workers that failed to confirm job processing.
618-640: LGTM!The method correctly delegates to
list_active_jobsto ensure stale job cleanup occurs, and the docstring follows numpy style as per guidelines.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #858 +/- ##
==========================================
- Coverage 79.79% 79.77% -0.03%
==========================================
Files 165 165
Lines 20205 20272 +67
==========================================
+ Hits 16123 16171 +48
- Misses 4082 4101 +19 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Add assigned_at timestamp when job transitions to ASSIGNED state - Update cleanup_stale_assigned_jobs() to use assigned_at (with fallback to created_at for backwards compatibility) - Update test to verify assigned_at is used for timeout calculation This fixes the issue where a job that waited in PENDING state would incorrectly timeout immediately upon assignment. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary by CodeRabbit
New Features
Bug Fixes
Other Changes
✏️ Tip: You can customize this high-level summary in your review settings.