Step 1: Graceful Exception Handling and Recovery#18
Step 1: Graceful Exception Handling and Recovery#18
Conversation
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
scngupta-dsp
left a comment
There was a problem hiding this comment.
Let us discuss when you have some time
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
There was a problem hiding this comment.
Pull Request Overview
This PR improves error handling and status reporting throughout the workflow by introducing graceful exception handling, extended flow state information (including exceptions), and updated logging. Key changes include:
- Adding try/except blocks in director_server to report errors gracefully.
- Updating GetFlowState methods and related proto messages to include an exception field.
- Refactoring Experiment and Aggregator components to utilize a new ExperimentStatus structure for detailed status and error trace propagation.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| openfl/experimental/workflow/transport/grpc/director_server.py | Adds exception handling in GetExperimentData using try/except and reports errors via context.abort. |
| openfl/experimental/workflow/transport/grpc/director_client.py | Modifies get_flow_state to include exception details in its return value. |
| openfl/experimental/workflow/runtime/federated_runtime.py | Updates get_flow_state to handle the new triplet (status, flow object, exception). |
| openfl/experimental/workflow/protocols/director.proto | Introduces a new exception field to GetFlowStateResponse. |
| openfl/experimental/workflow/interface/fl_spec.py | Enhances error messaging in _run_federated by building a contextual error message. |
| openfl/experimental/workflow/component/envoy/envoy.py | Improves logging and error handling during experiment data retrieval and collaborator execution. |
| openfl/experimental/workflow/component/director/experiment.py | Refactors experiment status handling using the new ExperimentStatus dataclass and returns a detailed status dict. |
| openfl/experimental/workflow/component/director/director.py | Adjusts experiment waiting and flow state retrieval to align with updated API contracts. |
| openfl/experimental/workflow/component/aggregator/aggregator.py | Refactors run_flow with new helper methods for better flow initialization and collaborator task management. |
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
There was a problem hiding this comment.
Pull Request Overview
This PR introduces graceful exception handling and improved error reporting across the federation experiment workflow. Key changes include:
- Wrapping experiment data streaming in director_server.py and updating GetFlowState responses to include exceptions.
- Propagating exception details through FederatedRuntime, Experiment, and director client components.
- Refactoring aggregator and director components to improve flow state tracking and status updates.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| openfl/experimental/workflow/transport/grpc/director_server.py | Adds try/except block for GetExperimentData and updates GetFlowState to include exception details. |
| openfl/experimental/workflow/transport/grpc/director_client.py | Updates get_flow_state return signature to include an exception value. |
| openfl/experimental/workflow/runtime/federated_runtime.py | Adjusts get_flow_state method to process three return values. |
| openfl/experimental/workflow/interface/fl_spec.py | Improves exception handling in the federated runtime flow setup. |
| openfl/experimental/workflow/component/envoy/envoy.py | Reorders data stream handling to better separate exception handling logic. |
| openfl/experimental/workflow/component/director/experiment.py | Refactors experiment status tracking and error reporting using a new ExperimentStatus dataclass. |
| openfl/experimental/workflow/component/director/director.py | Modifies get_flow_state and experiment waiting logic with clearer state checks. |
| openfl/experimental/workflow/component/aggregator/aggregator.py | Replaces the old run_flow implementation and introduces helper methods for state initialization and collaborator queue handling. |
Comments suppressed due to low confidence (2)
openfl/experimental/workflow/component/director/director.py:226
- Avoid using magic numbers (like '4') when checking the experiment status. Use the corresponding enum constant (e.g., Status.FAILED) for improved clarity and maintainability.
if experiment.experiment_status.status.value == 4:
openfl/experimental/workflow/component/director/experiment.py:204
- [nitpick] Consider providing a more descriptive error message that includes contextual details about the failure point, as the current message might be ambiguous when exp_name is None.
raise Exception(f"{error_msg} due to error: {e}") from e
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
scngupta-dsp
left a comment
There was a problem hiding this comment.
Some minor comments. Overall looks good Ishant. Few suggestions:
- Can we have a relook into the Director / Aggregator / Experiment code and Envoy code to ensure that our changes do not look like patchwork
- We also need to run a mental simulation to ensure that Phase 2 can be introduced seamlessly
Let us discuss
openfl/experimental/workflow/component/aggregator/aggregator.py
Outdated
Show resolved
Hide resolved
openfl/experimental/workflow/component/aggregator/aggregator.py
Outdated
Show resolved
Hide resolved
openfl/experimental/workflow/component/aggregator/aggregator.py
Outdated
Show resolved
Hide resolved
openfl/experimental/workflow/component/aggregator/aggregator.py
Outdated
Show resolved
Hide resolved
| def _restore_instance_snapshot(self) -> None: | ||
| """Restore instance snapshot if it exists.""" | ||
| if hasattr(self, "instance_snapshot"): | ||
| self.flow.restore_instance_snapshot(self.flow, list(self.instance_snapshot)) |
There was a problem hiding this comment.
Pls check if we need the check. FLSpec.restore_instance_snapshot already checks whether there is a backup
There was a problem hiding this comment.
Can your loop be optimized for readability and efficiency by avoiding repeated dictionary lookups? And instead of k and V can we give them more generic name for readability ?
There was a problem hiding this comment.
You're right about renaming k and v for better readability — I’ll update them to more meaningful names like collaborator and task_queue.
Regarding the optimization: the loop is already efficient as it avoids unnecessary operations by checking membership in selected_collaborators before putting the task into the queue. The dictionary lookup (self.__collaborator_tasks_queue.items()) is performed only once at the beginning, and each key is accessed only once per iteration. So, performance-wise, the loop is already optimal.
| self.experiment_status.update_experiment_status(Status.IN_PROGRESS) | ||
| logger.info(f"New experiment {self.name} for collaborators {self.collaborators}") |
There was a problem hiding this comment.
Can we make ExperimentStatus a sub-class of Experiment or something more object oriented / intuitive ? Please evaluate
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
| @@ -529,76 +557,3 @@ def valid_collaborator_cn_and_id( | |||
| def all_quit_jobs_sent(self) -> bool: | |||
| """Assert all quit jobs are sent to collaborators.""" | |||
There was a problem hiding this comment.
The current docstring is grammatically understandable but could be made clearer and more descriptive. Example """Check whether quit jobs have been sent to all authorized collaborators."""
Use "Check whether..." instead of "Assert..." to reflect that it's a boolean check, not an assert statement.
Clarifies that it's about authorized collaborators, which adds context.
| self._aggregator_grpc_server = None | ||
| self.aggregator = None | ||
| self.updated_flow = None | ||
| self.experiment_exception_trace = None |
There was a problem hiding this comment.
experiment_exception_trace can be removed if we are not using it anywhere
| f"\033[91m{exception}\033[0m", | ||
| ) | ||
| return flspec_obj | ||
|
|
There was a problem hiding this comment.
Can we use some constants or something more readable for color coding
There was a problem hiding this comment.
Yes, that makes sense. Similarly, since color constants are used throughout OpenFL, we can create a dedicated utility module for them. This would promote consistency and reusability across the codebase. I suggest we consider implementing this in a future iteration.
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
Summary
Background: Enhance Aggregator and Collaborators to gracefully catch exceptions, allowing the respective Director and Envoys to return to the wait_for_experiment state.
Type of Change (Mandatory)
Specify the type of change being made.
Description (Mandatory)
This PR introduces graceful exception handling and improved error reporting across the federation experiment workflow. Key changes include:
aggregator,experimentanddirectorcomponents to improve flow state tracking and status updates.GetFlowStateresponses to include exceptions.Testing
Additional Information
Files modified:
openfl/experimental/workflow/component/aggregator/aggregator.pyopenfl/experimental/workflow/component/director/director.pyopenfl/experimental/workflow/component/director/experiment.pyopenfl/experimental/workflow/component/envoy/envoy.pyopenfl/experimental/workflow/interface/fl_spec.pyopenfl/experimental/workflow/protocols/director.protoopenfl/experimental/workflow/runtime/federated_runtime.pyopenfl/experimental/workflow/transport/grpc/director_client.pyopenfl/experimental/workflow/transport/grpc/director_server.py