Robust signaling for coordinator inference#3563
Conversation
8d030c7 to
d07419d
Compare
24dfc98 to
84000ba
Compare
|
Hey, this still saw a lot of active work after opening it. Is it still in a WIP state? If yes, could you mark it as a draft, please? :) |
| identities = self.identities_of_data_parallel_ranks | ||
| if not identities: | ||
| raise RuntimeError("No engines connected") | ||
| idx = self._round_robin_idx % len(identities) | ||
| self._round_robin_idx = idx + 1 | ||
| return identities[idx] |
There was a problem hiding this comment.
Doing this because the list of engines connected to the coordinator is not static, so we can't build the iterator ahead of time.
| SUBMIT_REQUEST = auto() | ||
| ENGINE_REPLY = auto() | ||
| PAUSE = auto() | ||
| PAUSE_ACK = auto() |
There was a problem hiding this comment.
Remove the ACK logic altogether as it is very flaky.
| self.running = asyncio.Event() | ||
| self.paused = asyncio.Event() | ||
| self.stopped = asyncio.Event() |
There was a problem hiding this comment.
None of these are actually useful. They were added when the ACK logic would propagate readiness of engine up into the client. But that's very flaky.
There was a problem hiding this comment.
ah nice, the inference client is simple again. Love this
| reply = msgpack.unpackb(self.socket.recv(), raw=False)[0] | ||
| assert Headers(reply) == Headers.CONNECT_ACK | ||
|
|
||
| async def start(self, loop: Optional[asyncio.AbstractEventLoop] = None): |
There was a problem hiding this comment.
Making this async serves no purpose. _connect_with_inference_coordinator already blocks until the appropriate time.
In terms of accuracy regression, I don't think we currently have the ability to test this for any PR. That said, this does not affect rollout generation, so I do not see a way in which it could cause accuracy regression. In terms of slowdown, there's functional tests in CI to check against slowdown. Also this PR actives a unit-test to verify the coordinator does not slow down (and it does not slow down; this unit test was written but inactive for a few months now). |
Could you, please, run |
b9c4063 to
c3a315e
Compare
Done! They both took about 45 minutes, with this branch finishing slightly early at |
yobibyte
left a comment
There was a problem hiding this comment.
Hey! Left some comments. I did not check the FSM logic properly, inference folks would be a better fit for this.
| self.received_stop: bool = False | ||
| self.suspend_signal = False | ||
| self.is_suspended = False | ||
| for attr in self._STATE_EVENTS.values(): |
There was a problem hiding this comment.
I personally dislike setattr() as they make code less readable and less observable even in the debugger. IMO, it'll be much nicer to have a self._state_events as:
self._state_events = {k: asyncio.Event(),for k in _STATE_EVENTS}With this, you do not need the string representation, and you can easily print the state by just printing the state_events dict.
| # coordinator. | ||
| if self.is_suspended: | ||
| # Skip if already suspended or in the process of suspending. | ||
| if self.state in (EngineState.SUSPENDED, EngineState.SUSPENDING): |
There was a problem hiding this comment.
I will either make a method that checks this or make SUS_STATES=(EngineState.SUSPENDED, EngineState.SUSPENDING) and check self.state in SUS_STATES.
This will be easier to maintain in case we want to add another state here. Otherwise, we will have to modify each if self.state in ... which you duplicate.
There was a problem hiding this comment.
I disagree with this one for two reasons:
- This call is in the
suspendmethod, and this method can be assumed to take responsibility. - The list of states being checked here should never changed. It will always be
SUSPENDEDandSUSPENDING.
There was a problem hiding this comment.
I got triggered because you have self.state in (SUSPENDING, SUSPEND) 3 or 4 times in this PR.
| return "Megatron Dynamic Inference Server is running." | ||
|
|
||
| loop = asyncio.get_event_loop() | ||
| executor = ThreadPoolExecutor(max_workers=8192) |
There was a problem hiding this comment.
There should be a constant for this.
There was a problem hiding this comment.
Addressed by reverting all changes to this file; #3648 will soon rewrite it anyway.
|
🔄 Merge queue validation started! You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22688597937 |
|
🔄 Merge queue validation started! You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22692993863 |
|
🔄 Merge queue validation started! You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/22695853007 |
What does this PR do ?
State Diagram
PAUSE Protocol (details)
Contribution process
flowchart LR A[Pre-checks] --> B[PR Tests] subgraph Code Review/Approval C1[Expert Review] --> C2[Final Review] end B --> C1 C2 --> D[Merge]Pre-checks
Core 0.8)Code review
The following process is enforced via the CODEOWNERS file for changes into
megatron/core. For changes outside ofmegatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.For MRs into `main` branch
Feel free to message or comment the @mcore-oncall to help accelerate your merge into main. The less complex your PR is, the faster it will be approved and merged!
(Step 1): Add PR label
Expert Review(Step 2): Collect the expert reviewers reviews
Expert Reviewlabel when your PR is ready for review.Final Review might get declined if these requirements are not fulfilled.
(Step 3): Final Review
Final Reviewlabel(Optional Step 4): Cherry-pick into release branch
If this PR also needs to be merged into
core_r*release branches, after this PR has been merged, selectCherry-pickto open a new PR into the release branch.For MRs into `dev` branch
The proposed review process for `dev` branch is under active discussion.MRs are mergable after one approval by either
eharper@nvidia.comorzijiey@nvidia.com.Merging your PR
Any member of core-adlr and
core-nemowill be able to merge your PR.