diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 6ea65b863a..5821b660f3 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2747,8 +2747,11 @@ def sync_node_execution( # This is the plain ol' task execution case else: execution._task_executions = [ + # Sync task execution but only get inputs/outputs if the overall execution is done self.sync_task_execution( - FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task.interface + FlyteTaskExecution.promote_from_model(t), + node_mapping[node_id].task_node.flyte_task.interface, + get_task_exec_data=execution.is_done, ) for t in iterate_task_executions(self.client, execution.id) ] @@ -2763,16 +2766,26 @@ def sync_node_execution( return execution def sync_task_execution( - self, execution: FlyteTaskExecution, entity_interface: typing.Optional[TypedInterface] = None + self, + execution: FlyteTaskExecution, + entity_interface: typing.Optional[TypedInterface] = None, + get_task_exec_data: bool = True, ) -> FlyteTaskExecution: """Sync a FlyteTaskExecution object with its corresponding remote state.""" + execution._closure = self.client.get_task_execution(execution.id).closure - execution_data = self.client.get_task_execution_data(execution.id) task_id = execution.id.task_id if entity_interface is None: entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version) entity_interface = entity_definition.interface - return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) + if get_task_exec_data: + try: + execution_data = self.client.get_task_execution_data(execution.id) + return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) + except Exception as e: + logger.error(f"Failed to get data for successful task execution: {execution.id}, error: {e}") + raise + return execution ############################# # Terminate Execution State #