Skip to content

ft_launcher: integrate log analysis attribution for restart decisions#269

Open
namitdhameja wants to merge 1 commit intomainfrom
attr-ft-integration
Open

ft_launcher: integrate log analysis attribution for restart decisions#269
namitdhameja wants to merge 1 commit intomainfrom
attr-ft-integration

Conversation

@namitdhameja
Copy link
Contributor

Adds log analysis attribution to the FT launcher restart path. After workers fail, the launcher runs log analysis on the cycle log before deciding whether to restart. If attribution identifies a non-transient fault, it stops instead of restarting. If attribution latency is higher then configured, decision is made without waiting for the attr result.

CLI options:
--ft-attribution-loganalysis,
--ft-attribution-timeout,
--ft-slack-channel,
--ft-slack-token-file,
--ft-dataflow-index.

Attribution library:
1. sync bridge to the async LogAnalyzer via a dedicated daemon thread + event loop; results cached per path
2. Centralize post processing code (slack, dataflow) making it available for both service and library

FT launcher integration
1. Three modes: lib (in-process), mcp (subprocess), url (HTTP service).
2. Config for slack, dataflow, attribution (mode+timeout)
3. Invoked in _handle_restart_decision: attribution wall time deducted from GPU reclaim timeout budget.

@namitdhameja namitdhameja self-assigned this Feb 27, 2026
@namitdhameja namitdhameja added the ci-approved Approved to run CI label Feb 27, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 27, 2026

Greptile Summary

Integrates log analysis attribution into the FT launcher restart decision path. After worker failures, the launcher analyzes cycle logs to determine if the fault is transient before restarting.

Key changes:

  • Three attribution modes: lib (in-process), mcp (subprocess), url (HTTP service)
  • New LogAnalysisClient with sync bridge to async LogAnalyzer via dedicated event loop thread
  • Attribution time is deducted from GPU reclaim timeout budget to prevent delays
  • Centralized postprocessing (Slack, dataflow) shared between lib/mcp and service modes
  • Six new CLI options: --ft-attribution-loganalysis, --ft-attribution-timeout, --ft-attribution-dry-run, --ft-slack-channel, --ft-slack-token-file, --ft-dataflow-index
  • Restart decision flow: attribution → progress tracker → remaining restarts check
  • If attribution identifies non-transient fault, stops instead of restarting (unless dry-run mode)

Confidence Score: 5/5

  • Safe to merge - well-structured integration with proper error handling and timeout management
  • Clean architecture with three distinct modes, proper thread safety in the event loop initialization, comprehensive timeout accounting, and good error handling throughout. Previous review concerns have been addressed with proper flag resets and analyzer cleanup.
  • No files require special attention - implementation is solid across all changed files

Important Files Changed

Filename Overview
src/nvidia_resiliency_ext/fault_tolerance/ft_attribution.py New attribution integration module - creates HTTP client and LogAnalysisClient with three modes (lib, mcp, url)
src/nvidia_resiliency_ext/attribution/log_analyzer/runner.py New sync bridge to async LogAnalyzer - uses dedicated thread + event loop for in-process and MCP modes
src/nvidia_resiliency_ext/fault_tolerance/launcher.py Integrates attribution into restart decision flow with timeout accounting and GPU reclaim budget management
src/nvidia_resiliency_ext/fault_tolerance/config.py Adds SlackConfig dataclass and attribution config fields with file-based token support
src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py Replaces old AttributionService with LogAnalysisClient integration in rendezvous handler
src/nvidia_resiliency_ext/attribution/postprocessing/config.py Adds configure_postprocessing_resolved for centralized Slack/dataflow setup with env fallback

Sequence Diagram

sequenceDiagram
    participant Agent as LocalElasticAgent
    participant Handler as _handle_restart_decision
    participant Attribution as LogAnalysisClient
    participant Runner as runner (lib/mcp)
    participant Service as AttributionService (url)
    participant Progress as ProgressTracker
    participant Workers as _restart_workers

    Agent->>Handler: Worker failure detected
    Handler->>Handler: Notify peers & open rendezvous
    Handler->>Attribution: _run_attribution()
    
    alt lib or mcp mode
        Attribution->>Runner: fetch_result(log_path)
        Runner->>Runner: run_log_analysis_sync()
        Runner-->>Attribution: result or None
    else url mode
        Attribution->>Service: GET /logs?log_path=...
        Service-->>Attribution: HTTP 200 with result
    end
    
    Attribution-->>Handler: should_stop decision
    
    alt attribution says stop
        Handler->>Handler: Check dry_run flag
        alt dry_run enabled
            Handler->>Handler: Log but proceed
        else dry_run disabled
            Handler-->>Agent: return False (no restart)
        end
    end
    
    Handler->>Progress: analyze_previous_cycle()
    Progress-->>Handler: should_terminate_early
    
    alt no progress detected
        Handler-->>Agent: return False (no restart)
    else restarts available
        Handler->>Workers: _restart_workers(time_consumed)
        Workers->>Workers: _stop_workers(will_restart=True)
        Workers->>Workers: Deduct attribution time from GPU reclaim budget
        Workers->>Workers: _start_workers()
        Handler-->>Agent: return True (restarted)
    end
Loading

Last reviewed commit: abc9fb5

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

21 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

if _lib_loop_starting:
pass # another caller already started the thread; wait below
else:
_lib_loop_starting = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_lib_loop_starting never reset to False on failure. If thread fails to start, subsequent callers will skip thread creation but wait indefinitely for _lib_loop_ready

Suggested change
_lib_loop_starting = True
_lib_loop_starting = True
_lib_loop_ready.clear()

Comment on lines +87 to +89
if not use_lib:
future = asyncio.run_coroutine_threadsafe(_lib_analyzer.connect_mcp(), _lib_loop)
future.result(timeout=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If MCP connect fails, _lib_analyzer left in partially initialized state. Next call will skip init (line 74 check passes) but analyzer won't be connected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved Approved to run CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant