Skip to content

Robust signaling for coordinator inference#3563

Queued
tdene wants to merge 48 commits intoNVIDIA:mainfrom
tdene:tde/robust_coordinator_signaling
Queued

Robust signaling for coordinator inference#3563
tdene wants to merge 48 commits intoNVIDIA:mainfrom
tdene:tde/robust_coordinator_signaling

Conversation

@tdene
Copy link
Contributor

@tdene tdene commented Feb 24, 2026

What does this PR do ?

State Diagram

          +------>  RUNNING  <------+
          |         +--+--+         | 
          |            |            |
       UNPAUSE      PAUSE          |
      (broadcast)  (idempotent)    |
          |            |            |
          |            v            |
          +-------  PAUSING        |
          |         +--+--+        |
          |            |           |
          |      EP all-reduce     |
          |       then world       |
          |       barrier          |
          |            |           |
          |            v           |
          +-------  PAUSED --------+
          |         +--+--+--+
          |            |     |     
          |        SUSPEND  STOP --------+
          |            |                 |
          |            v                 |
          |     SUSPENDING               |
          |         |                    |
          |    world barrier             |
          |         |                    |
          |         v                    v
          |     SUSPENDED ----STOP---> STOPPING
          |         |                    |
          |       RESUME            world barrier
          |         |                    |
          |         v                    v
          |      RESUMING              STOPPED
          |         |                  teardown, exit
          |    world barrier
          |         |
          +----  PAUSED
              (then UNPAUSE)  

PAUSE Protocol (details)

Client           Coordinator         mp_src (engine)      mp_workers
  |--PAUSE---------->|                    |                   |
  |                  | state = PAUSED     |                   |
  |                  |---PAUSE----------->|                   |
  |                  |---PAUSE----------->| (all dp_ranks)    |
  |                  |                    |--PUB PAUSE------->|
  |                  |                    |                   |
  |                  |                    | state = PAUSING   |
  |                  |                    | EP all-reduce:    |
  |                  |                    |   report 0        |
  |                  |                    |   dummy forward   |
  |                  |                    |   until consensus |
  |                  |                    |                   |
  |                  |                    | world barrier     |
  |                  |                    | (all ranks sync)  |
  |                  |                    |                   |
  |                  |                    | state = PAUSED    |
  |                  |                    | paused.set()      |

Contribution process

flowchart LR
    A[Pre-checks] --> B[PR Tests]
    subgraph Code Review/Approval
        C1[Expert Review] --> C2[Final Review]
    end
    B --> C1
    C2 --> D[Merge]
Loading

Pre-checks

  • I want this PR in a versioned release and have added the appropriate Milestone (e.g., Core 0.8)
  • I have added relevant unit tests
  • I have added relevant functional tests
  • I have added proper typing to my code Typing guidelines
  • I have added relevant documentation
  • I have run the autoformatter.sh on my PR

Code review

The following process is enforced via the CODEOWNERS file for changes into megatron/core. For changes outside of megatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.

For MRs into `main` branch

Feel free to message or comment the @mcore-oncall to help accelerate your merge into main. The less complex your PR is, the faster it will be approved and merged!

(Step 1): Add PR label Expert Review

(Step 2): Collect the expert reviewers reviews

  1. Attach the Expert Review label when your PR is ready for review.
  2. GitHub auto-assigns expert reviewers based on your changes. They will get notified and pick up your PR soon.

⚠️ Only proceed to the next step once all reviewers have approved, merge-conflict are resolved and the CI is passing.
Final Review might get declined if these requirements are not fulfilled.

(Step 3): Final Review

  1. Add Final Review label
  2. GitHub auto-assigns final reviewers based on your changes. They will get notified and pick up your PR soon.

(Optional Step 4): Cherry-pick into release branch

If this PR also needs to be merged into core_r* release branches, after this PR has been merged, select Cherry-pick to open a new PR into the release branch.

For MRs into `dev` branch The proposed review process for `dev` branch is under active discussion.

MRs are mergable after one approval by either eharper@nvidia.com or zijiey@nvidia.com.

Merging your PR

Any member of core-adlr and core-nemo will be able to merge your PR.

@tdene tdene requested review from a team as code owners February 24, 2026 16:59
@svcnvidia-nemo-ci svcnvidia-nemo-ci added this to the Core 0.16 milestone Feb 24, 2026
@svcnvidia-nemo-ci svcnvidia-nemo-ci requested a review from a team February 24, 2026 17:00
@tdene tdene force-pushed the tde/robust_coordinator_signaling branch 2 times, most recently from 8d030c7 to d07419d Compare February 24, 2026 18:30
@tdene tdene force-pushed the tde/robust_coordinator_signaling branch from 24dfc98 to 84000ba Compare February 25, 2026 12:54
@janEbert
Copy link
Contributor

Hey, this still saw a lot of active work after opening it. Is it still in a WIP state? If yes, could you mark it as a draft, please? :)

Comment on lines +180 to +185
identities = self.identities_of_data_parallel_ranks
if not identities:
raise RuntimeError("No engines connected")
idx = self._round_robin_idx % len(identities)
self._round_robin_idx = idx + 1
return identities[idx]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this because the list of engines connected to the coordinator is not static, so we can't build the iterator ahead of time.

SUBMIT_REQUEST = auto()
ENGINE_REPLY = auto()
PAUSE = auto()
PAUSE_ACK = auto()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the ACK logic altogether as it is very flaky.

Comment on lines 75 to -77
self.running = asyncio.Event()
self.paused = asyncio.Event()
self.stopped = asyncio.Event()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these are actually useful. They were added when the ACK logic would propagate readiness of engine up into the client. But that's very flaky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nice, the inference client is simple again. Love this

reply = msgpack.unpackb(self.socket.recv(), raw=False)[0]
assert Headers(reply) == Headers.CONNECT_ACK

async def start(self, loop: Optional[asyncio.AbstractEventLoop] = None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this async serves no purpose. _connect_with_inference_coordinator already blocks until the appropriate time.

@tdene
Copy link
Contributor Author

tdene commented Mar 3, 2026

@tdene how do we test that this:

  • Does not cause any slowdown on top of what we have.
  • Does not lead to the accuracy regressions

In terms of accuracy regression, I don't think we currently have the ability to test this for any PR. That said, this does not affect rollout generation, so I do not see a way in which it could cause accuracy regression.

In terms of slowdown, there's functional tests in CI to check against slowdown. Also this PR actives a unit-test to verify the coordinator does not slow down (and it does not slow down; this unit test was written but inactive for a few months now).

@yobibyte
Copy link
Contributor

yobibyte commented Mar 3, 2026

@tdene how do we test that this:

  • Does not cause any slowdown on top of what we have.
  • Does not lead to the accuracy regressions

In terms of accuracy regression, I don't think we currently have the ability to test this for any PR. That said, this does not affect rollout generation, so I do not see a way in which it could cause accuracy regression.

In terms of slowdown, there's functional tests in CI to check against slowdown. Also this PR actives a unit-test to verify the coordinator does not slow down (and it does not slow down; this unit test was written but inactive for a few months now).

Could you, please, run just functional with this before/after and compare the time it takes to finish? You can prob use my test harness for that.

@tdene tdene force-pushed the tde/robust_coordinator_signaling branch from b9c4063 to c3a315e Compare March 4, 2026 00:42
@tdene
Copy link
Contributor Author

tdene commented Mar 4, 2026

@tdene how do we test that this:

  • Does not cause any slowdown on top of what we have.
  • Does not lead to the accuracy regressions

In terms of accuracy regression, I don't think we currently have the ability to test this for any PR. That said, this does not affect rollout generation, so I do not see a way in which it could cause accuracy regression.
In terms of slowdown, there's functional tests in CI to check against slowdown. Also this PR actives a unit-test to verify the coordinator does not slow down (and it does not slow down; this unit test was written but inactive for a few months now).

Could you, please, run just functional with this before/after and compare the time it takes to finish? You can prob use my test harness for that.

Done! They both took about 45 minutes, with this branch finishing slightly early at 02:15:16.136000, and main finishing at 02:15:29.290000.

Copy link
Contributor

@yobibyte yobibyte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! Left some comments. I did not check the FSM logic properly, inference folks would be a better fit for this.

self.received_stop: bool = False
self.suspend_signal = False
self.is_suspended = False
for attr in self._STATE_EVENTS.values():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally dislike setattr() as they make code less readable and less observable even in the debugger. IMO, it'll be much nicer to have a self._state_events as:

self._state_events = {k: asyncio.Event(),for k in _STATE_EVENTS}

With this, you do not need the string representation, and you can easily print the state by just printing the state_events dict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addresed!

# coordinator.
if self.is_suspended:
# Skip if already suspended or in the process of suspending.
if self.state in (EngineState.SUSPENDED, EngineState.SUSPENDING):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will either make a method that checks this or make SUS_STATES=(EngineState.SUSPENDED, EngineState.SUSPENDING) and check self.state in SUS_STATES.

This will be easier to maintain in case we want to add another state here. Otherwise, we will have to modify each if self.state in ... which you duplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with this one for two reasons:

  • This call is in the suspend method, and this method can be assumed to take responsibility.
  • The list of states being checked here should never changed. It will always be SUSPENDED and SUSPENDING.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got triggered because you have self.state in (SUSPENDING, SUSPEND) 3 or 4 times in this PR.

return "Megatron Dynamic Inference Server is running."

loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=8192)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a constant for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by reverting all changes to this file; #3648 will soon rewrite it anyway.

@tdene tdene enabled auto-merge March 4, 2026 19:50
@tdene tdene added this pull request to the merge queue Mar 4, 2026
@svcnvidia-nemo-ci
Copy link

🔄 Merge queue validation started!

You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22688597937

@svcnvidia-nemo-ci
Copy link

🔄 Merge queue validation started!

You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22692993863

@svcnvidia-nemo-ci
Copy link

🔄 Merge queue validation started!

You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22695853007

@svcnvidia-nemo-ci
Copy link

🔄 Merge queue validation started!

You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22701050201

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Expert Review Apply this label to indicate that your PR is ready for expert review.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants