From 5b11e454e2a64d0cb6624336ca907a855c5355e9 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 3 Nov 2025 13:31:34 -0800 Subject: [PATCH 1/3] don't fetch task execution data if the node is still running Signed-off-by: Yee Hing Tong --- flytekit/remote/remote.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 6ea65b863a..b490c2690d 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -80,6 +80,7 @@ from flytekit.models.admin import workflow as admin_workflow_models from flytekit.models.admin.common import Sort from flytekit.models.common import NamedEntityIdentifier +from flytekit.models.core import execution as core_execution_models from flytekit.models.core import identifier as id_models from flytekit.models.core import workflow as workflow_model from flytekit.models.core.identifier import Identifier, ResourceType, SignalIdentifier, WorkflowExecutionIdentifier @@ -2747,8 +2748,11 @@ def sync_node_execution( # This is the plain ol' task execution case else: execution._task_executions = [ + # Sync task execution but only get inputs/outputs if the overall execution is done self.sync_task_execution( - FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task.interface + FlyteTaskExecution.promote_from_model(t), + node_mapping[node_id].task_node.flyte_task.interface, + get_task_exec_data=execution.is_done, ) for t in iterate_task_executions(self.client, execution.id) ] @@ -2763,16 +2767,27 @@ def sync_node_execution( return execution def sync_task_execution( - self, execution: FlyteTaskExecution, entity_interface: typing.Optional[TypedInterface] = None + self, + execution: FlyteTaskExecution, + entity_interface: typing.Optional[TypedInterface] = None, + get_task_exec_data: bool = True, ) -> FlyteTaskExecution: """Sync a FlyteTaskExecution object with its corresponding remote state.""" + execution._closure = self.client.get_task_execution(execution.id).closure - execution_data = self.client.get_task_execution_data(execution.id) task_id = execution.id.task_id if entity_interface is None: entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version) entity_interface = entity_definition.interface - return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) + if get_task_exec_data and execution.closure.phase == core_execution_models.TaskExecutionPhase.SUCCEEDED: + try: + execution_data = self.client.get_task_execution_data(execution.id) + return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) + except Exception as e: + breakpoint() + logger.error(f"Failed to get data for successful task execution: {execution.id}, error: {e}") + raise + return execution ############################# # Terminate Execution State # From bae18bb85e6ea42befb31ff18f36aba5228514e9 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 3 Nov 2025 13:32:47 -0800 Subject: [PATCH 2/3] breakpoint Signed-off-by: Yee Hing Tong --- flytekit/remote/remote.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index b490c2690d..5c00f771dc 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2784,7 +2784,6 @@ def sync_task_execution( execution_data = self.client.get_task_execution_data(execution.id) return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) except Exception as e: - breakpoint() logger.error(f"Failed to get data for successful task execution: {execution.id}, error: {e}") raise return execution From 337a892782f33b1d884e8ec0f6dacd766c9c6100 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 3 Nov 2025 13:40:51 -0800 Subject: [PATCH 3/3] remove task phase check now that we added the node level check Signed-off-by: Yee Hing Tong --- flytekit/remote/remote.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 5c00f771dc..5821b660f3 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -80,7 +80,6 @@ from flytekit.models.admin import workflow as admin_workflow_models from flytekit.models.admin.common import Sort from flytekit.models.common import NamedEntityIdentifier -from flytekit.models.core import execution as core_execution_models from flytekit.models.core import identifier as id_models from flytekit.models.core import workflow as workflow_model from flytekit.models.core.identifier import Identifier, ResourceType, SignalIdentifier, WorkflowExecutionIdentifier @@ -2779,7 +2778,7 @@ def sync_task_execution( if entity_interface is None: entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version) entity_interface = entity_definition.interface - if get_task_exec_data and execution.closure.phase == core_execution_models.TaskExecutionPhase.SUCCEEDED: + if get_task_exec_data: try: execution_data = self.client.get_task_execution_data(execution.id) return self._assign_inputs_and_outputs(execution, execution_data, entity_interface)