-
Notifications
You must be signed in to change notification settings - Fork 11
Add retry mechanism and improved error handling for processing services #1022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
WalkthroughProcessing pipeline error handling now uses a centralized extractor for HTTP responses; processing service readiness checks use a retry-enabled session with a 90s timeout; pipeline service selection uses tracked latency/live attributes to pick the lowest-latency online service. Changes
Sequence Diagram(s)sequenceDiagram
participant Pipeline as Pipeline.process_images
participant Service as ProcessingService
participant Session as HTTP Session (retry)
participant Extract as extract_error_message_from_response
rect rgb(240,248,255)
Note over Pipeline,Service: Service selection (uses last_checked_live & last_checked_latency)
Pipeline->>Service: request status (get_status via session)
Service->>Session: GET /status (retry-enabled, timeout=90s)
Session-->>Service: status response
end
sequenceDiagram
participant Pipeline as Pipeline.process_images
participant Session as HTTP Session (retry)
participant Service as Processing endpoint
participant Extract as extract_error_message_from_response
rect rgb(245,245,220)
Pipeline->>Session: POST /process (with request_data)
Session->>Service: HTTP request
alt 200 OK
Service-->>Session: 200 response
Session-->>Pipeline: success payload
else non-OK
Service-->>Session: non-OK response
Session-->>Pipeline: response
Pipeline->>Extract: extract_error_message_from_response(resp)
Extract-->>Pipeline: error_msg (status | detail / kv / text / content)
Pipeline->>Pipeline: msg = f"Failed to process {request_data.summary()}: {error_msg}"
Pipeline-->>Pipeline: log and raise HTTPError (if no job context) / attach to job.logs
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Areas requiring attention:
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
🔍 Existing Issues For ReviewYour pull request is modifying functions with the following pre-existing issues: 📄 File: ami/ml/models/pipeline.py
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
ami/utils/tests.py (1)
39-60: Solid test coverage for the error extraction helper.The test method covers the three primary scenarios: standard
detailfield extraction, fallback to non-standard fields, and fallback to text when JSON parsing fails. The use ofMock(spec=requests.Response)is appropriate.Consider adding a fourth test case to exercise the raw bytes fallback at line 80 of
extract_error_message_from_response(when both JSON parsing andresp.textfail):# Test fallback to raw bytes when text access fails mock_response.json.side_effect = ValueError("No JSON") mock_response.text = property(lambda self: (_ for _ in ()).throw(Exception("text error"))) mock_response.content = b"Raw error bytes" result = extract_error_message_from_response(mock_response) self.assertIn("Response content: b'Raw error bytes'", result)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
ami/ml/models/pipeline.py(3 hunks)ami/ml/models/processing_service.py(4 hunks)ami/utils/requests.py(1 hunks)ami/utils/tests.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/models/pipeline.py (2)
ami/utils/requests.py (2)
create_session(14-41)extract_error_message_from_response(44-82)ami/jobs/models.py (1)
logger(989-998)
ami/ml/models/processing_service.py (2)
ami/utils/requests.py (1)
create_session(14-41)ami/ml/schemas.py (1)
ProcessingServiceStatusResponse(245-257)
ami/utils/tests.py (1)
ami/utils/requests.py (1)
extract_error_message_from_response(44-82)
🪛 Ruff (0.14.2)
ami/utils/requests.py
78-78: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (4)
ami/utils/requests.py (1)
44-82: LGTM! Solid error extraction with appropriate fallbacks.The function correctly prioritizes the FastAPI "detail" field, falls back gracefully through multiple alternatives, and limits output to 500 characters to prevent log overflow. The broad
Exceptioncatch at line 78 (flagged by static analysis) is acceptable here as a final fallback when extractingresp.textfails—this ensures the function always returns a usable error message even in edge cases like encoding issues.ami/ml/models/processing_service.py (1)
141-211: Well-designed retry mechanism for serverless cold starts.The retry configuration (3 retries, 2s backoff, 90s timeout) appropriately handles transient failures and cold-start delays. The increased timeout from 6s to 90s makes sense for services loading multiple models into memory. The assertion at line 194 is safe since it only executes when
last_checked_live=True(request succeeded).ami/ml/models/pipeline.py (2)
245-251: Improved error handling with centralized message extraction.The use of
extract_error_message_from_response(resp)provides consistent, detailed error messages. The "Processing service request failed: " prefix makes it clear where errors originate in logs.
1041-1088: Cache staleness check is missing but acceptable given refresh interval—implement TODO in future.The periodic task
check_processing_services_online()runs every 5 minutes and refreshes the cachedlast_checked_liveandlast_checked_latencyfields for all services. The method correctly uses these cached values to avoid redundant health checks.However, the TODO at line 1044 identifies a gap: the method does not validate the max age of cached data before selecting a service. Currently,
last_checkedtimestamps are recorded but never validated. A 5-minute maximum staleness is acceptable for typical workloads, and the retry mechanism inprocessing_service.pywould handle transient failures from stale service selection.Implementing the max age check is a reasonable future improvement but is not critical for this change. Document or track this as a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/ml/models/pipeline.py (1)
1063-1090: Fix UnboundLocalError when all services are online but have no latency data.If all processing services have
last_checked_live=Truebut none have a validlast_checked_latencyvalue, the variableprocessing_service_lowest_latencywill never be assigned. This causes anUnboundLocalErrorwhen the code tries to log and return it on lines 1086-1090.Apply this diff to ensure a service is always selected when services are online:
# check the status of all processing services and pick the one with the lowest latency lowest_latency = float("inf") processing_services_online = False + processing_service_lowest_latency = None for processing_service in processing_services: if processing_service.last_checked_live: processing_services_online = True if ( processing_service.last_checked_latency and processing_service.last_checked_latency < lowest_latency ): lowest_latency = processing_service.last_checked_latency # pick the processing service that has lowest latency processing_service_lowest_latency = processing_service + elif processing_service_lowest_latency is None: + # Fallback: pick the first online service if no latency data available + processing_service_lowest_latency = processing_service # if all offline then throw error if not processing_services_online: msg = f'No processing services are online for the pipeline "{pipeline_name}".' task_logger.error(msg) raise Exception(msg) else: + assert processing_service_lowest_latency is not None, "No service selected despite being online" task_logger.info( f"Using processing service with latency {round(lowest_latency, 4)}: " f"{processing_service_lowest_latency}" ) return processing_service_lowest_latency
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
ami/ml/models/pipeline.py(3 hunks)ami/ml/schemas.py(1 hunks)ami/ml/tests.py(1 hunks)ami/utils/tests.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
ami/ml/schemas.py (2)
ui/src/data-services/models/occurrence-details.ts (1)
detections(108-110)ui/src/data-services/models/job.ts (1)
pipeline(109-111)
ami/ml/models/pipeline.py (2)
ami/utils/requests.py (2)
create_session(14-41)extract_error_message_from_response(44-82)ami/ml/schemas.py (1)
summary(179-196)
ami/utils/tests.py (1)
ami/utils/requests.py (1)
extract_error_message_from_response(44-82)
ami/ml/tests.py (3)
ami/jobs/models.py (2)
Job(719-1004)save(939-950)ami/tests/fixtures/main.py (3)
setup_test_project(114-131)create_captures_from_files(171-203)create_processing_service(42-71)ami/ml/models/pipeline.py (3)
save(1116-1122)process_images(163-278)process_images(1092-1106)
🪛 Ruff (0.14.2)
ami/utils/tests.py
64-64: Unused lambda argument: self
(ARG005)
ami/ml/tests.py
137-137: Probable insecure usage of temporary file or directory: "/tmp/nonexistent_image.jpg"
(S108)
139-139: Consider [error_image, *test_images[1:2]] instead of concatenation
(RUF005)
144-145: try-except-pass detected, consider logging the exception
(S110)
144-144: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (5)
ami/ml/schemas.py (1)
179-196: LGTM!The
summary()method provides clear, human-friendly request summaries with proper null handling and pluralization. This will improve error messages throughout the pipeline.ami/utils/tests.py (1)
39-61: LGTM!The test cases correctly validate the error message extraction logic for JSON detail fields, non-standard fields, and text fallback scenarios.
ami/ml/tests.py (1)
120-152: LGTM!The test correctly validates that processing service errors are captured in job logs with the expected "Failed to process" message format. The use of
/tmp/nonexistent_image.jpgand bare exception handling are appropriate for testing error scenarios.ami/ml/models/pipeline.py (2)
54-54: LGTM!Good addition of the centralized error message extractor.
244-247: LGTM!The improved error handling provides clear, actionable error messages by combining the request summary with detailed error information from the response.
| # Test fallback to raw bytes when text access fails | ||
| mock_response.json.side_effect = ValueError("404 Not Found: Could not fetch image") | ||
| mock_response.text = property(lambda self: (_ for _ in ()).throw(Exception("text error"))) | ||
| mock_response.content = b"Raw error bytes" | ||
| result = extract_error_message_from_response(mock_response) | ||
| self.assertIn("Response content: b'Raw error bytes'", result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the mock property simulation for text access failure.
The current implementation assigns a property object directly to mock_response.text, which won't cause an exception when the attribute is accessed. The Mock framework doesn't interpret this as a property descriptor.
Apply this diff to properly simulate the text property raising an exception:
- # Test fallback to raw bytes when text access fails
- mock_response.json.side_effect = ValueError("404 Not Found: Could not fetch image")
- mock_response.text = property(lambda self: (_ for _ in ()).throw(Exception("text error")))
- mock_response.content = b"Raw error bytes"
- result = extract_error_message_from_response(mock_response)
- self.assertIn("Response content: b'Raw error bytes'", result)
+ # Test fallback to raw bytes when text access fails
+ mock_response.json.side_effect = ValueError("404 Not Found: Could not fetch image")
+ type(mock_response).text = property(lambda self: (_ for _ in ()).throw(Exception("text error")))
+ mock_response.content = b"Raw error bytes"
+ result = extract_error_message_from_response(mock_response)
+ self.assertIn("Response content: b'Raw error bytes'", result)Alternatively, use PropertyMock:
+ from unittest.mock import PropertyMock
+
# Test fallback to raw bytes when text access fails
mock_response.json.side_effect = ValueError("404 Not Found: Could not fetch image")
- mock_response.text = property(lambda self: (_ for _ in ()).throw(Exception("text error")))
+ type(mock_response).text = PropertyMock(side_effect=Exception("text error"))
mock_response.content = b"Raw error bytes"
result = extract_error_message_from_response(mock_response)
self.assertIn("Response content: b'Raw error bytes'", result)🧰 Tools
🪛 Ruff (0.14.2)
64-64: Unused lambda argument: self
(ARG005)
🤖 Prompt for AI Agents
In ami/utils/tests.py around lines 62 to 67, the test attempts to simulate a
failing .text attribute by assigning a property object directly to
mock_response.text, which doesn't raise on attribute access; replace that with a
proper PropertyMock (or use patch.object on the mock's class) that has
side_effect=Exception("text error") so accessing mock_response.text raises the
exception and the fallback to .content is exercised.
Summary
This PR adds retry logic and improved error handling for processing service health checks and ML pipeline requests. These are safe, non-breaking changes extracted from PR #981 to improve reliability when communicating with external processing services.
Details
Offline processing services
Currently the health check for a processing service fails very easily. As soon as you add a service it goes to offline status, when you go to process images for the first time in a while, it goes to offline status. This makes most jobs fail the first time you try them, and periodically within the job.
Now the health check better handles cold starts & retries the check on failure. Furthermore, it doesn't check the health before every image batch is sent, it just relies on the periodic check in the background.
Error messages from the processing service in the job logs
Currently if a single image or image batch fails in an unhandled way on the processing service, a very boring message is displayed in Antenna.
Now it's a little better:
Failed to process pipeline request with 2 images and 0 detections to pipeline 'constant': HTTP 500: Internal Server Error | Response text: Internal Server ErrorThe message is still limited because in the deep context where it is thrown we still don't know which job, which batch number, or what error happened on the processing service side. But it's a little better. The next step is to update the processing services to better catch deeper errors in PyTorch / the ML processing and translate them into better error messages in their API response -- but this will depend on the maintainers of each processing service.
Motivation
External processing services (especially serverless ones) can experience:
This PR addresses these issues with automatic retries and better error reporting.
Changes
1. Retry Mechanism for Health Checks (5c635c4)
File:
ami/ml/models/processing_service.pyRetrywith exponential backoff toProcessingService.get_status()Benefits:
2. Use Cached Status Before Sending Images (fb6fdc6)
File:
ami/ml/models/pipeline.pylast_checked_liveandlast_checked_latencyfields instead of callingget_status()every timeBenefits:
3. Better Error Messages from Processing Services (d3f5839)
File:
ami/utils/requests.pyextract_error_message_from_response()utility functiondetailfield (FastAPI standard), falls back to full JSON, text, or raw bytesFile:
ami/ml/models/pipeline.pyprocess_images()to log clearer error messagesFile:
ami/utils/tests.pyextract_error_message_from_response()Benefits:
Testing
These commits were cherry-picked from PR #981 where they have been tested. The changes are:
To test manually:
Related