Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,26 +367,28 @@ def PushAppOutputs(
"""Push ServerApp process outputs."""
log(DEBUG, "ServerAppIoServicer.PushAppOutputs")

# Validate the token
# Validate token and bind request to the token-owned run.
run_id = self._verify_token(request.token, context)
if request.run_id != run_id:
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Invalid token.")
Comment thread
msheller marked this conversation as resolved.

# Init state and store
state = self.state_factory.state()
store = self.objectstore_factory.store()

# Abort if the run is not running
abort_if(
request.run_id,
run_id,
[Status.PENDING, Status.STARTING, Status.FINISHED],
state,
store,
context,
)

state.set_serverapp_context(request.run_id, context_from_proto(request.context))
state.set_serverapp_context(run_id, context_from_proto(request.context))

# Remove the token
state.delete_token(run_id)
# Keep token until terminal status is committed. If shutdown finalization fails,
# heartbeat expiry still needs the token to trigger run finalization fallback.
return PushAppOutputsResponse()
Comment on lines +390 to 392
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Delete token before entering shutdown cleanup

Keeping the token alive in PushAppOutputs creates a new failure window during normal shutdown: on_exit in framework/py/flwr/server/serverapp/app.py stops the heartbeat thread before it calls UpdateRunStatus, so if log upload/join or network teardown delays finalization for ~60s, token cleanup can mark the still-successful run as FINISHED/FAILED before UpdateRunStatus(FINISHED/COMPLETED) arrives. At that point abort_if(...[Status.FINISHED]...) rejects the completion update, so a successful run is persisted as failed under slow shutdown conditions.

Useful? React with 👍 / 👎.


def UpdateRunStatus(
Expand All @@ -403,14 +405,32 @@ def UpdateRunStatus(
abort_if(request.run_id, [Status.FINISHED], state, store, context)

# Update the run status
state.update_run_status(
updated = state.update_run_status(
run_id=request.run_id, new_status=run_status_from_proto(request.run_status)
)
if not updated:
# Keep token unchanged when the transition fails; it can still expire and
# trigger heartbeat-based fallback finalization.
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"Failed to update status for run {request.run_id}",
)

# If the run is finished, delete the run from ObjectStore
# If the run is finished, delete token immediately after status persistence.
# This prevents expiry cleanup from flipping a completed run while cleanup I/O
# is still in progress.
if request.run_status.status == Status.FINISHED:
# Delete all objects related to the run
store.delete_objects_in_run(request.run_id)
state.delete_token(request.run_id)
try:
# Delete all objects related to the run
store.delete_objects_in_run(request.run_id)
except Exception as exc: # pylint: disable=broad-exception-caught
log(
ERROR,
"Failed to delete objects for run %d: %s",
request.run_id,
exc,
)

return UpdateRunStatusResponse()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ def test_pull_message_from_expired_message_error(self) -> None:
assert list(object_ids_in_response) == [msg_res.object_id]

def test_push_serverapp_outputs_successful_if_running(self) -> None:
"""Test `PushServerAppOutputs` success."""
"""Test `PushServerAppOutputs` success and token retention."""
# Prepare
run_id = self._create_dummy_run(running=False)
token = self.state.create_token(run_id)
Expand Down Expand Up @@ -724,6 +724,36 @@ def test_push_serverapp_outputs_successful_if_running(self) -> None:
# Assert
assert isinstance(response, PushAppOutputsResponse)
assert grpc.StatusCode.OK == call.code()
assert self.state.get_run_id_by_token(token) == run_id

def test_push_serverapp_outputs_fails_if_token_run_id_mismatch(self) -> None:
"""Reject outputs when request.run_id does not match token-bound run."""
# Prepare
run_id_a = self._create_dummy_run(running=False)
run_id_b = self._create_dummy_run(running=False)
token = self.state.create_token(run_id_a)
assert token is not None
self._transition_run_status(run_id_a, 2)
self._transition_run_status(run_id_b, 2)

maker = RecordMaker()
context = Context(
run_id=run_id_b,
node_id=0,
node_config=maker.user_config(),
state=maker.recorddict(1, 1, 1),
run_config=maker.user_config(),
)
request = PushAppOutputsRequest(
token=token, run_id=run_id_b, context=context_to_proto(context)
)

# Execute/Assert
with self.assertRaises(grpc.RpcError) as e:
self._push_serverapp_outputs.with_call(request=request)
assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED
assert self.state.get_serverapp_context(run_id_a) is None
assert self.state.get_serverapp_context(run_id_b) is None

def _assert_push_serverapp_outputs_not_allowed(
self, token: str, context: Context
Expand Down Expand Up @@ -823,6 +853,113 @@ def test_update_run_status_not_successful_if_finished(self) -> None:
assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED
assert e.exception.details() == self.status_to_msg[run_status.status]

def test_update_run_status_invalid_transition_returns_failed_precondition(
self,
) -> None:
"""Test invalid transition returns FAILED_PRECONDITION and keeps token."""
# Prepare
run_id = self._create_dummy_run(running=False)
token = self.state.create_token(run_id)
assert token is not None
self._transition_run_status(run_id, 2) # Move run to RUNNING

# RUNNING -> STARTING is invalid and should fail.
request = UpdateRunStatusRequest(
run_id=run_id,
run_status=run_status_to_proto(RunStatus(Status.STARTING, "", "")),
)

# Execute/Assert
with self.assertRaises(grpc.RpcError) as e:
self._update_run_status.with_call(request=request)
assert e.exception.code() == grpc.StatusCode.FAILED_PRECONDITION
assert self.state.get_run_id_by_token(token) == run_id

def test_update_run_status_finished_deletes_token(self) -> None:
"""Test successful terminal transition deletes token."""
# Prepare
run_id = self._create_dummy_run(running=False)
token = self.state.create_token(run_id)
assert token is not None
self._transition_run_status(run_id, 2) # Move run to RUNNING

request = UpdateRunStatusRequest(
run_id=run_id,
run_status=run_status_to_proto(
RunStatus(Status.FINISHED, SubStatus.COMPLETED, "done")
),
)

# Execute
response, call = self._update_run_status.with_call(request=request)

# Assert
assert isinstance(response, UpdateRunStatusResponse)
assert grpc.StatusCode.OK == call.code()
assert self.state.get_run_id_by_token(token) is None

def test_update_run_status_finished_deletes_token_if_cleanup_fails(self) -> None:
"""Delete token even if FINISHED object-store cleanup raises."""
# Prepare
run_id = self._create_dummy_run(running=False)
token = self.state.create_token(run_id)
assert token is not None
self._transition_run_status(run_id, 2) # Move run to RUNNING
request = UpdateRunStatusRequest(
run_id=run_id,
run_status=run_status_to_proto(
RunStatus(Status.FINISHED, SubStatus.COMPLETED, "done")
),
)

# Execute/Assert
with patch.object(
self.store,
"delete_objects_in_run",
side_effect=RuntimeError("object cleanup failed"),
):
response, call = self._update_run_status.with_call(request=request)
assert isinstance(response, UpdateRunStatusResponse)
assert grpc.StatusCode.OK == call.code()
assert self.state.get_run_id_by_token(token) is None
run_status = self.state.get_run_status({run_id})[run_id]
assert run_status.status == Status.FINISHED
assert run_status.sub_status == SubStatus.COMPLETED

def test_update_run_status_finished_deletes_token_before_cleanup(self) -> None:
"""Delete token before starting FINISHED object-store cleanup."""
# Prepare
run_id = self._create_dummy_run(running=False)
token = self.state.create_token(run_id)
assert token is not None
self._transition_run_status(run_id, 2) # Move run to RUNNING
request = UpdateRunStatusRequest(
run_id=run_id,
run_status=run_status_to_proto(
RunStatus(Status.FINISHED, SubStatus.COMPLETED, "done")
),
)
cleanup_called = {"value": False}

def _cleanup_and_assert_token_deleted(run_id_to_delete: int) -> None:
assert run_id_to_delete == run_id
cleanup_called["value"] = True
assert self.state.get_run_id_by_token(token) is None

# Execute
with patch.object(
self.store,
"delete_objects_in_run",
side_effect=_cleanup_and_assert_token_deleted,
):
response, call = self._update_run_status.with_call(request=request)

# Assert
assert isinstance(response, UpdateRunStatusResponse)
assert grpc.StatusCode.OK == call.code()
assert cleanup_called["value"]
assert self.state.get_run_id_by_token(token) is None

@parameterized.expand([(True,), (False,)]) # type: ignore
def test_send_app_heartbeat(self, success: bool) -> None:
"""Test sending an app heartbeat."""
Expand Down
Loading