Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions framework/py/flwr/common/exit/exit.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def flwr_exit(
-----
This function MUST be called from the main thread.
"""
print(f"DEBUG: `flwr_exit` called with code {code} and message: {message}")
is_error = not 0 <= code < 100 # 0-99 are success exit codes

# Construct exit message
Expand Down Expand Up @@ -98,6 +99,7 @@ def flwr_exit(

# Start a daemon thread to force exit if graceful exit fails
def force_exit() -> None:
print(f"DEBUG: Force exit thread started, will exit in {FORCE_EXIT_TIMEOUT_SECONDS} seconds if not already exited.")
time.sleep(FORCE_EXIT_TIMEOUT_SECONDS)
os._exit(sys_exit_code)

Expand Down
1 change: 1 addition & 0 deletions framework/py/flwr/common/exit/signal_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def graceful_exit_handler(signalnum: int, _frame: FrameType) -> None:
When called will reset signal handler to original signal handler from
default_handlers.
"""
print(f"DEBUG: Received signal {signalnum}, initiating graceful shutdown.")
# Reset to default handler
signal.signal(signalnum, default_handlers[signalnum]) # type: ignore

Expand Down
14 changes: 2 additions & 12 deletions framework/py/flwr/simulation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from flwr.simulation.simulationio_connection import SimulationIoConnection
from flwr.supercore.app_utils import start_parent_process_monitor
from flwr.supercore.constant import NOOP_FEDERATION
from flwr.supercore.heartbeat import HeartbeatSender, make_app_heartbeat_fn_grpc
from flwr.supercore.heartbeat import make_app_heartbeat_fn_grpc
from flwr.supercore.superexec.dependency_installer import (
cleanup_app_runtime_environment,
install_app_dependencies,
Expand Down Expand Up @@ -178,18 +178,13 @@ def run_simulation_process( # pylint: disable=R0913, R0914, R0915, R0917, W0212
# Initialize variables for finally block
log_uploader = None
run_id_hash = None
heartbeat_sender = None
run = None
run_status = None
run_id_hash = None
runtime_env_dir = None
exit_code = ExitCode.SUCCESS

def on_exit() -> None:
# Stop heartbeat sender
if heartbeat_sender and heartbeat_sender.is_running:
heartbeat_sender.stop()

# Stop log uploader for this run and upload final logs
if log_uploader:
stop_log_uploader(log_queue, log_uploader)
Expand Down Expand Up @@ -293,12 +288,6 @@ def on_exit() -> None:
},
)

# Set up heartbeat sender
heartbeat_sender = HeartbeatSender(
make_app_heartbeat_fn_grpc(conn._stub, token)
)
heartbeat_sender.start()

# Launch the simulation
updated_context = _run_simulation(
server_app_attr=server_app_attr,
Expand All @@ -312,6 +301,7 @@ def on_exit() -> None:
verbose_logging=verbose,
server_app_context=context,
is_app=True,
heartbeat_fn=make_app_heartbeat_fn_grpc(conn._stub, token),
exit_event=EventType.FLWR_SIMULATION_RUN_LEAVE,
)

Expand Down
30 changes: 29 additions & 1 deletion framework/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import platform
import threading
import traceback
from collections.abc import Callable
from logging import DEBUG, ERROR, INFO, WARNING
from queue import Empty, Queue
from typing import Any, cast
Expand Down Expand Up @@ -55,10 +56,16 @@
FLWR_IN_MEMORY_DB_NAME,
NOOP_FEDERATION,
)
from flwr.supercore.heartbeat import HeartbeatSender
from flwr.supercore.object_store import ObjectStoreFactory
from flwr.superlink.federation import NoOpFederationManager


def _noop_heartbeat_fn() -> bool:
"""Return a successful heartbeat result for local simulation runs."""
return True


def _replace_keys(d: Any, match: str, target: str) -> Any:
if isinstance(d, dict):
return {
Expand Down Expand Up @@ -156,6 +163,7 @@ def run_simulation(

_ = _run_simulation(
num_supernodes=num_supernodes,
heartbeat_fn=_noop_heartbeat_fn,
client_app=client_app,
server_app=server_app,
backend_name=backend_name,
Expand Down Expand Up @@ -241,6 +249,7 @@ def server_th_with_start_checks(
def _main_loop(
num_supernodes: int,
backend_name: str,
heartbeat_fn: Callable[[], bool],
backend_config_stream: str,
app_dir: str,
is_app: bool,
Expand All @@ -260,9 +269,11 @@ def _main_loop(
)

f_stop = threading.Event()
# A Threading event to indicate if an exception was raised in the ServerApp thread
# A threading event to indicate if any simulation-managed thread failed.
server_app_thread_has_exception = threading.Event()
serverapp_th = None
heartbeat_sender = HeartbeatSender(heartbeat_fn)
previous_threading_excepthook = threading.excepthook
success = True
if server_app_context is None:
server_app_context = Context(
Expand Down Expand Up @@ -299,6 +310,18 @@ def _main_loop(
ctx_queue=output_context_queue,
)

def on_thread_exception(
args: threading.ExceptHookArgs,
) -> None:
if args.thread is heartbeat_sender._thread: # pylint: disable=protected-access
log(ERROR, "Heartbeat thread raised an exception: %s", args.exc_value)
# Trigger stop event for Simulation Engine
f_stop.set()
previous_threading_excepthook(args)

threading.excepthook = on_thread_exception
heartbeat_sender.start()

# Start Simulation Engine
vce.start_vce(
num_supernodes=num_supernodes,
Expand Down Expand Up @@ -338,6 +361,9 @@ def _main_loop(
serverapp_th.join(timeout=5)
if server_app_thread_has_exception.is_set():
raise RuntimeError("Exception in ServerApp thread")
threading.excepthook = previous_threading_excepthook
if heartbeat_sender.is_running:
heartbeat_sender.stop()

log(DEBUG, "Stopping Simulation Engine now.")
return updated_context
Expand All @@ -347,6 +373,7 @@ def _main_loop(
def _run_simulation(
num_supernodes: int,
exit_event: EventType,
heartbeat_fn: Callable[[], bool],
client_app: ClientApp | None = None,
server_app: ServerApp | None = None,
backend_name: str = "ray",
Expand Down Expand Up @@ -428,6 +455,7 @@ def _run_simulation(
args = (
num_supernodes,
backend_name,
heartbeat_fn,
backend_config_stream,
app_dir,
is_app,
Expand Down
1 change: 1 addition & 0 deletions framework/py/flwr/supercore/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def fn() -> bool:

# Raise SIGINT to trigger graceful shutdown if heartbeat failed
if not res.success:
print("DEBUG: Heartbeat failed, raising SIGINT to trigger graceful shutdown.")
signal.raise_signal(signal.SIGINT)
Comment on lines 155 to 157
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here

return True

Expand Down
Loading