-
Notifications
You must be signed in to change notification settings - Fork 147
Wired Memory Management System #348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
So devs can customize admission gating
|
Tested end-to-end with the changes in mlx-swift-lm most recent commits. So the pipeline from here to inference over there is effectively wired together with unit tests demonstrating the system, how to use it effectively, and preserving default behavior to some degree. |
|
I added policy-only mode for CPU and future CUDA workflows (I see open issues about CUDA support with indications of inclusion) and made it so Apple Silicon devices in CPU-only mode can still use maximumRecommendedWorkingSetBytes as a reference cap 😄 |
| MLX only provides the generic interfaces. MLXLMCommon (from mlx-swift-lm) | ||
| provides LLM-focused policies such as `WiredSumPolicy`, `WiredMaxPolicy`, and | ||
| `WiredFixedPolicy`. You can use `GPU.maxRecommendedWorkingSetBytes()` as a | ||
| portable upper bound when designing custom policies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if one or more of these should be in mlx-swift? WiredSumPolicy or WiredMaxPolicy for example might be generic enough and could be used in domains outside of llms. I do agree that the interesting policies will be domain specific.
| try await ticket.withWiredLimit { | ||
| // run inference | ||
| } | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice clear example!
|
|
||
| // Reserve model weights without keeping the limit elevated while idle. | ||
| let weights = policy.ticket(size: weightsBytes, kind: .reservation) | ||
| _ = await weights.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a way to cancel this? E.g. you might have a server that would load and unload models (weights). It might make sense to cancel the policy rather than this reservation, or if this returned a Cancellable you could cancel the specific reservation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if this was inside a Task you could cancel the task (I can see how that works), but this example looks like you might want to hold a ticket long term and wrapping it in a Task is unwieldy.
|
|
||
| ### Choosing a baseline | ||
|
|
||
| When wired memory is unsupported, the manager will use: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be unclear -- if wired memory is unsupported what do these values actually do? I would think it would just be a NOP.
| let result = mlx_set_wired_limit(&previous, 0) | ||
| guard result == 0 else { return nil } | ||
| var tmp: size_t = 0 | ||
| _ = mlx_set_wired_limit(&tmp, previous) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned that temporarily setting it to 0 may cause trouble. Can we avoid using this? What if the manager could answer this and we just document that use of multiple managers outside a testing context is ill-advised or undefined. The manager sets it so it should surely have this answer.
| /// | ||
| /// These settings implement hysteresis to prevent small or frequent shrinks | ||
| /// while active work is running. Growing the limit is always allowed; shrinking | ||
| /// is gated by a minimum drop and a minimum time between changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something you observed being a problem? If so, awesome -- it seems like it could be useful to keep active memory hot.
|
|
||
| if let lastLimitChange { | ||
| let elapsed = Date().timeIntervalSince(lastLimitChange) | ||
| if elapsed < configuration.shrinkCooldown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one concern here: the way I read this is we won't shrink unless another request comes through here?
Consider:
- request for N bytes of wired memory
- request is complete, reduce back to 0 but per the timeout no
- no more requests come in
Does the process still hold the wired memory? I think so, but perhaps there is a path I didn't see.
| /// wired memory control is unsupported (e.g. CPU-only execution). The | ||
| /// manager will not attempt to change wired memory, but tickets can still | ||
| /// gate admission and emit events. | ||
| public var policyOnlyWhenUnsupported: Bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a benefit to having the default be false? Or even having this config -- why not just always do this? It seems harmless (the compute cost is minimal even if the result is a NOP).
| /// Debug label for the policy group, if applicable. | ||
| public let policy: String? | ||
| /// Baseline wired limit captured from the system. | ||
| public let baseline: Int? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the term baseline needs an explicit definition? My concept is that this is the idle floor -- we don't go below this, even when there are no outstanding requests. But the unsupported case and in the code seem to tie this to the max supported value -- I am not sure what the intent is.
I think the baseline today (without this code) is 0.
|
|
||
| /// Stable grouping key for policies. | ||
| private enum PolicyKey: Hashable { | ||
| case identifier(AnyHashable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this enum? Why not just use AnyHashable as the key? This is internal to the actor so if the implementation requires adding another level it is safe to do so without callers being aware.
| public func end(id: UUID, policy: any WiredMemoryPolicy) async -> Int { | ||
| if let waiter = waiters.removeValue(forKey: id) { | ||
| waiter.resume() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a call to resumeWaiters() at the end that will awaken all the waiters. Why have a one off here? (not saying it is wrong, but it isn't clear to me -- if we need it I think maybe it needs a comment)
| waiter.resume() | ||
| } | ||
|
|
||
| guard WiredMemoryBackend.isSupported || policyOnlyMode else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think isSupported will return false if this is a Task set for the cpu device. Two questions:
- will returning a 0 here affect the wired memory for the GPU tasks? would a nil be better to indicate "no change"?
- the return value is more informative -- policy is applied by
applyCurrentLimit(). would the return from the end of the function make more sense?currentLimit ?? baseline ?? 0
- the return value is more informative -- policy is applied by
- is it ok that
resumeWaiters()is not called (also consider this question if the one-off in a previous line isn't needed) -- I think maybe this is ok because the wired memory probably doesn't change here
| } | ||
|
|
||
| guard let state = tickets.removeValue(forKey: id) else { | ||
| emit(kind: .ticketEndIgnored, ticketID: id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the case where this would happen? I wonder if this should be a fatalError -- is some invariant violated here (like we lost a ticket)?
I think at one point it was mentioned that end should be idempotent. Should it? Or is that papering over programming errors? free() isn't idempotent and is a similar resource release idea.
| continuation.onTermination = { _ in | ||
| Task { await self.removeEventContinuation(id: id) } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is safe -- the call to AsyncStream is synchronous so the interior happens before it returns. It surprised me because I was used to this form:
let (stream, continuation) = AsyncStream<Generation>.makeStream()I don't have a strong preference - I think they are the same, just pointing out as I had to read to make sure I understood.
| var baselineValue = ensureBaseline(refresh: baseline == nil || !hasActiveWork()) | ||
| if tickets[id] != nil { | ||
| emit( | ||
| kind: .ticketStartIgnored, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question on the end -- a double start might be a programming error. Is there a use case for allowing this?
| /// | ||
| /// If this was the last ticket, the manager restores the baseline and | ||
| /// clears internal state. | ||
| public func end(id: UUID, policy: any WiredMemoryPolicy) async -> Int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is marked as async but I do not see any await inside. Callers (outside the actor) have to await anyway as that is what the actor requires, but this would also force callers inside the actor (if there were any) to 1) be async and 2) have a potentially a suspend point, which is requires a little more care.
I think this should remove the async.
| if WiredMemoryBackend.isSupported { | ||
| return WiredMemoryBackend.readCurrentLimit() ?? 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use currentLimit here? I know there is an issue if you have multiple managers but we can document that as undefined (it seems useless except for tests).
In fact, I wonder ... we could make the init() on the actor be non-public to enforce this. The tests could still create multiple. Or add an obvious path like static func newTestPolicy() that would be more noticeable if somebody tried multiple in a real program?
| return 0 | ||
| } | ||
|
|
||
| private func ensureBaseline(refresh: Bool) -> Int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not taking an action, it is computing the current baseline. ensure sounds like it is applying. I see it sits on top of resolve so we are already using that name :-). The key here is that it notices when the resolved baseline changes. resolveBaselineAndEmit? I am not great with the names (and it is private so ultimately just needed for clarity), see what you think.
| popd | ||
| ``` | ||
|
|
||
| ## Wired Memory Management |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These docs are well written but I don't think we need them at the top level of the README. I wonder if this would be better in Source/MLX/Documentation.docc/MLX.md (the top level of the MLX docs where there are pointers to some of the articles with more information). This README is much higher level.
| size: Int, | ||
| policy: any WiredMemoryPolicy, | ||
| kind: WiredMemoryTicketKind | ||
| ) async -> Int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike the end this one must be async as it can suspend per the admit on the policy.
| @@ -0,0 +1,121 @@ | |||
| # Wired Memory Management | |||
|
|
|||
| Coordinate a process-wide wired memory limit for GPU workloads. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a few additions:
- deprecate Memory.withWired and GPU.withWired
- the sync version of that can be documented as being a NOP
- the async version can use a static policy, e.g.
WiredSumPolicy(document)
We don't want a back door that will let people bypass the mechanism.
|
Looks really good! Check out my comments & questions and see what you think. |

Proposed changes
Implements option 3 for #347 by moving the wired memory coordinator into mlx‑swift and exposing a generic manager/policy/ticket API.
WiredMemoryManager,WiredMemoryTicket,WiredMemoryPolicy, andWiredMemoryEvent.id(Identifiable) for grouping.wired-memory.md) plus README/MLX doc updates.Checklist
Put an
xin the boxes that apply.pre-commit run --all-filesto format my code / installed pre-commit prior to committing changes