diff --git a/analysis.md b/analysis.md new file mode 100644 index 000000000000..d571bbdec04e --- /dev/null +++ b/analysis.md @@ -0,0 +1,205 @@ +# CPU Memory Leak Analysis (GitHub Issue #28726) + +## Summary + +vLLM suffers from continuous CPU memory growth when serving multimodal (VLM) +models with prefix caching enabled (the default). The EngineCore subprocess +RSS grows by ~1.5 GB per 1000 requests and never stabilizes, eventually +causing OOM. The issue appeared between v0.11.0 and v0.11.1. + +**Root cause**: A reference cycle in `Request` objects prevents Python's +reference counting from freeing them. A GC optimization introduced in v0.11.1 +reduces how often the cyclic garbage collector runs, causing these cyclic +`Request` objects (each holding megabytes of multimodal feature data) to +accumulate far faster than the GC can reclaim them. + +## The Reference Cycle + +In `vllm/v1/request.py`, when prefix caching is enabled, each `Request` binds +itself into a `functools.partial`: + +```python +# vllm/v1/request.py:167-170 (main branch) +self.get_hash_new_full_blocks: Callable[[], list[BlockHash]] | None = None +if block_hasher is not None: + self.get_hash_new_full_blocks = partial(block_hasher, self) # <-- cycle + self.block_hashes = self.get_hash_new_full_blocks() +``` + +This creates a **reference cycle**: + +``` +Request ──(self.get_hash_new_full_blocks)──> partial object + ^ │ + └──────────(partial stores self as arg)────────┘ +``` + +### Why this matters + +Python uses two garbage collection mechanisms: + +1. **Reference counting** (immediate): When an object's reference count drops + to zero, it is freed instantly. This is the fast path. + +2. **Cyclic garbage collector** (deferred): Periodically scans for groups of + objects that reference each other but are unreachable from the rest of the + program. This is the slow path, and it runs based on heuristic thresholds. + +The reference cycle means that when the scheduler finishes a request and does +`del self.requests[request_id]`, the `Request` object's reference count +**does not drop to zero** -- the `partial` still holds a reference. The +`partial`'s count doesn't drop to zero either -- the `Request` still holds +it. Both objects are unreachable from the program, but neither can be freed +by reference counting. They become **cyclic garbage**, waiting for the cyclic +GC to detect and collect them. + +### Why it only affects prefix caching + +When prefix caching is **disabled**, `block_hasher` is `None`, so the +`partial` is never created. There is no cycle. `Request` objects are freed +immediately by reference counting when the scheduler removes them. This is +why `--no-enable-prefix-caching` prevents the leak. + +### Why it only affects multimodal models visibly + +Each `Request` object holds a `mm_features: list[MultiModalFeatureSpec]` +field. For vision-language models, this contains the **processed image +feature tensors** -- several megabytes per image. A text-only request has +empty `mm_features` and is only a few kilobytes. When cyclic garbage +accumulates: + +- **Text-only**: 100 leaked Request objects ~ a few MB (invisible) +- **VLM with images**: 100 leaked Request objects ~ hundreds of MB to GBs + (causes OOM) + +## Why It Became a Problem in v0.11.1 + +The reference cycle existed since prefix caching was introduced. In v0.11.0, +it was harmless because the cyclic GC ran frequently enough to clean it up. +Two changes in v0.11.1 broke this equilibrium: + +### Change 1: Fewer GC-tracked objects per request (primary cause) + +**Commit `acaa2c0a4`** -- *"Reuse empty block lists whenever possible in +KVCacheBlocks to mitigate GC costs"* + +This optimization replaced empty `list` objects (`[]`) with empty `tuple` +objects (`()`) in `KVCacheBlocks`. Empty tuples are **not tracked by the +cyclic GC** (CPython optimization), while empty lists are. This means each +request cycle creates fewer GC-tracked objects. + +Python's cyclic GC uses a generational scheme with thresholds (default: +`(700, 10, 10)`): + +- **Generation 0** collection triggers when 700+ new tracked objects + accumulate since the last gen-0 collection. +- **Generation 1** triggers every 10 gen-0 collections. +- **Generation 2** triggers every 10 gen-1 collections (every 100 gen-0's). + +With fewer tracked objects created per request, it takes longer for the +generation-0 threshold (700 objects) to be reached. This means: +- Gen-0 collections happen less often +- Gen-1 and gen-2 collections happen much less often +- Cyclic garbage from `Request` objects accumulates longer before being swept + +In v0.11.0, the extra `list` objects from `KVCacheBlocks` kept the GC +running frequently. Gen-2 collections (which sweep long-lived cyclic +garbage) ran often enough that the leaked `Request` memory stabilized. +In v0.11.1, gen-2 collections became too infrequent, and memory grew +without bound. + +### Change 2: Earlier gc.freeze() (secondary contributor) + +**Commit `b30372cbd`** -- *"Move gc.freeze logic from EngineCoreProc to +EngineCore for better coverage"* + +`gc.freeze()` moves all currently tracked objects into a permanent +generation that the GC never scans. This was moved from the end of +`EngineCoreProc.__init__()` to the end of `EngineCore.__init__()`, +freezing objects earlier. While this doesn't directly prevent collection +of new `Request` objects, the different freeze timing subtly changes the +GC's generation accounting, further reducing the frequency of collections +on unfrozen objects. + +## Reproduction Results + +Using `Qwen/Qwen2.5-VL-3B-Instruct` with the `lmarena-ai/VisionArena-Chat` +dataset (real user-uploaded images), 1000 prompts per round, prefix caching +enabled: + +### main branch (leak present) + +``` +Round Reqs Total(GB) EC(GB) EC delta EC round Time +---------------------------------------------------------------------- +idle 0 3.63 3.63 --- --- +1 1000 10.97 10.97 +7.33 +7.33 66s +2 2000 14.34 14.34 +10.71 +3.37 57s +3 3000 15.94 15.94 +12.31 +1.60 55s +4 4000 16.91 16.91 +13.28 +0.97 58s +5 5000 17.38 17.38 +13.74 +0.47 58s + +EngineCore final RSS: 14.70 GB (started at 2.40 GB) +Growth rate: +1.60 GB/round average -- NEVER STABILIZES +``` + +### fix-cpu-leak branch (leak fixed) + +``` +Round Reqs Total(GB) EC(GB) EC delta EC round Time +---------------------------------------------------------------------- +idle 0 3.63 3.63 --- --- +1 1000 9.86 9.86 +6.22 +6.22 67s +2 2000 10.50 10.50 +6.86 +0.64 56s +3 3000 10.55 10.55 +6.91 +0.05 57s +4 4000 10.55 10.55 +6.92 +0.01 56s +5 5000 10.64 10.64 +7.01 +0.09 56s + +EngineCore final RSS: 7.51 GB (started at 2.41 GB) +Growth rate: +0.20 GB/round average -- STABLE after round 1 +``` + +The fix reduces EngineCore memory by **half** (7.51 GB vs 14.70 GB) after +5000 multimodal requests. + +## The Fix + +**Break the reference cycle** by storing `block_hasher` directly without +`partial`, and passing `self` explicitly at call sites: + +```python +# BEFORE (creates cycle): +self.get_hash_new_full_blocks = partial(block_hasher, self) +self.block_hashes = self.get_hash_new_full_blocks() + +# AFTER (no cycle): +self._block_hasher = block_hasher +self.block_hashes = self._block_hasher(self) +``` + +Without the cycle, `Request` objects are freed **immediately** by reference +counting when the scheduler removes them -- no cyclic GC needed. This +eliminates the leak regardless of GC frequency or `gc.freeze()` behavior. + +### Files changed + +- **`vllm/v1/request.py`** -- Store `_block_hasher` instead of + `partial(block_hasher, self)`. Update `append_output_token_ids()` to call + `self._block_hasher(self)`. +- **`vllm/v1/core/sched/scheduler.py`** -- Update session block hash call + site from `session.get_hash_new_full_blocks()` to + `session._block_hasher(session)`. +- **`tests/v1/core/test_async_scheduler.py`** -- Update test call site. + +### Verification (unit test) + +With cyclic GC disabled (`gc.disable()`), create 100 Request objects with +prefix caching and delete all external references: + +| | main (cycle) | fix (no cycle) | +|---|---|---| +| Objects alive after `del` | **100** (all leaked) | **0** (all freed) | +| Freed by `gc.collect()` | 100 | 0 (nothing to collect) | + +All 137 existing tests pass (86 scheduler + 43 kv_cache_utils + 8 async +scheduler). diff --git a/test_mm_memory_leak.py b/test_mm_memory_leak.py new file mode 100644 index 000000000000..d866359cffcf --- /dev/null +++ b/test_mm_memory_leak.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +""" +Reproduction script for CPU memory leak with multimodal models (issue #28726). + +Uses the EXACT reproduction from the issue report: + - vllm serve Qwen2.5-VL-3B-Instruct --max-model-len 25000 + - vllm bench serve with lmarena-ai/VisionArena-Chat dataset (1000 prompts) + +Measures the ENGINE CORE subprocess RSS specifically (the EngineCore runs in +a child process in V1 architecture, and that's where Request objects live). + +The leak is caused by a reference cycle in Request objects: + Request -> partial(block_hasher, self) -> Request + +Usage: + python test_mm_memory_leak.py # run on current branch + python test_mm_memory_leak.py --rounds 3 # fewer rounds + +Requirements: + - GPU with enough VRAM for Qwen2.5-VL-3B-Instruct (~8GB) + - Model cached at ~/.cache/huggingface/hub/ + - Internet access (first run downloads VisionArena-Chat dataset) +""" + +import argparse +import json +import os +import re +import subprocess +import sys +import tempfile +import time +import urllib.error +import urllib.request +from pathlib import Path + +os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0") + +MODEL = "Qwen/Qwen2.5-VL-3B-Instruct" +PORT = 29345 +HOST = f"http://localhost:{PORT}" + + +def get_process_tree_rss(root_pid: int) -> dict[int, float]: + """Get RSS for every process in the tree. Returns {pid: rss_gb}.""" + result = {} + pids = _get_all_descendant_pids(root_pid) + pids.add(root_pid) + for pid in pids: + try: + with open(f"/proc/{pid}/status") as f: + rss_kb = 0 + cmdline = "" + for line in f: + if line.startswith("VmRSS:"): + rss_kb = int(line.split()[1]) + break + # Also get command name + try: + cmdline = Path(f"/proc/{pid}/cmdline").read_text().replace("\0", " ")[:100] + except Exception: + cmdline = "?" + result[pid] = (rss_kb / (1024 * 1024), cmdline) + except (FileNotFoundError, ProcessLookupError, PermissionError): + pass + return result + + +def get_tree_total_rss_gb(root_pid: int) -> float: + tree = get_process_tree_rss(root_pid) + return sum(rss for rss, _ in tree.values()) + + +def get_engine_core_rss_gb(root_pid: int) -> float: + """Find and return RSS of the EngineCore subprocess.""" + tree = get_process_tree_rss(root_pid) + for pid, (rss_gb, cmdline) in tree.items(): + if pid != root_pid and "python" in cmdline.lower() and rss_gb > 0.1: + # The engine core subprocess is the largest Python child process + return rss_gb + # Fallback: return total tree RSS + return sum(rss for rss, _ in tree.values()) + + +def _get_all_descendant_pids(pid: int) -> set[int]: + children = set() + try: + result = subprocess.run( + ["pgrep", "-P", str(pid)], capture_output=True, text=True, + ) + for line in result.stdout.strip().splitlines(): + if line.strip(): + child = int(line.strip()) + children.add(child) + children.update(_get_all_descendant_pids(child)) + except Exception: + pass + return children + + +def wait_for_server(timeout: int = 300) -> bool: + start = time.time() + while time.time() - start < timeout: + try: + req = urllib.request.Request(f"{HOST}/health") + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + return True + except (urllib.error.URLError, ConnectionRefusedError, OSError): + pass + time.sleep(3) + return False + + +def run_bench(num_prompts: int) -> tuple[float, bool]: + """Run vllm bench serve with VisionArena-Chat. Returns (elapsed, success).""" + cmd = [ + sys.executable, "-c", + "import sys; " + f"sys.argv = ['vllm', 'bench', 'serve', " + f"'--backend', 'openai-chat', " + f"'--model', '{MODEL}', " + f"'--endpoint', '/v1/chat/completions', " + f"'--dataset-name', 'hf', " + f"'--dataset-path', 'lmarena-ai/VisionArena-Chat', " + f"'--hf-split', 'train', " + f"'--num-prompts', '{num_prompts}', " + f"'--port', '{PORT}']; " + "from vllm.entrypoints.cli.main import main; main()", + ] + t0 = time.time() + result = subprocess.run(cmd, capture_output=True, text=True, timeout=1200) + elapsed = time.time() - t0 + success = result.returncode == 0 + if not success: + err = (result.stderr or "")[-500:] + (result.stdout or "")[-500:] + print(f" Bench output: {err}") + return elapsed, success + + +def print_process_tree(root_pid: int, label: str): + """Print RSS breakdown of all processes in the tree.""" + tree = get_process_tree_rss(root_pid) + print(f"\n Process tree ({label}):") + for pid, (rss_gb, cmdline) in sorted(tree.items()): + short_cmd = cmdline[:60] + print(f" PID {pid:>7}: {rss_gb:6.2f} GB {short_cmd}") + total = sum(rss for rss, _ in tree.values()) + print(f" {'TOTAL':>11}: {total:6.2f} GB") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--rounds", type=int, default=5) + parser.add_argument("--prompts", type=int, default=1000) + args = parser.parse_args() + + num_rounds = args.rounds + prompts_per_round = args.prompts + + print("=" * 70) + print(" CPU Memory Leak Reproduction - Multimodal + Prefix Caching") + print(" (using lmarena-ai/VisionArena-Chat dataset)") + print("=" * 70) + + repo_dir = os.path.dirname(os.path.abspath(__file__)) + branch = subprocess.run( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + capture_output=True, text=True, cwd=repo_dir, + ).stdout.strip() + commit = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, text=True, cwd=repo_dir, + ).stdout.strip() + print(f"Branch: {branch} ({commit})") + print(f"Model: {MODEL}") + print(f"Config: {num_rounds} rounds x {prompts_per_round} prompts") + print(f" prefix caching ON, max-model-len 25000") + print() + + # --- Start vLLM server --- + log_file = tempfile.NamedTemporaryFile( + mode="w", prefix="vllm_server_", suffix=".log", delete=False, + ) + log_path = log_file.name + print(f"Starting vLLM server (log: {log_path})...") + + server_cmd = [ + sys.executable, "-m", "vllm.entrypoints.openai.api_server", + "--model", MODEL, + "--port", str(PORT), + "--max-model-len", "25000", + "--gpu-memory-utilization", "0.95", + "--limit-mm-per-prompt", json.dumps({"image": 1, "video": 0}), + "--mm-processor-kwargs", json.dumps({ + "min_pixels": 28 * 28, + "max_pixels": 1280 * 28 * 28, + }), + ] + + server_proc = subprocess.Popen( + server_cmd, stdout=log_file, stderr=subprocess.STDOUT, + ) + + try: + if not wait_for_server(timeout=300): + print("ERROR: Server failed to start within timeout.") + server_proc.kill() + server_proc.wait(timeout=10) + log_file.close() + print("Server log (last 3000 chars):") + print(Path(log_path).read_text()[-3000:]) + return 1 + + server_pid = server_proc.pid + rss_idle = get_tree_total_rss_gb(server_pid) + ec_idle = get_engine_core_rss_gb(server_pid) + print(f"Server ready (PID {server_pid}).") + print_process_tree(server_pid, "idle") + + # --- Run benchmark rounds --- + print(f"\n{'Round':<7} {'Reqs':<8} {'Total(GB)':<11} {'EC(GB)':<9} " + f"{'EC delta':<10} {'EC round':<10} {'Time'}") + print("-" * 70) + print(f"{'idle':<7} {0:<8} {rss_idle:<11.2f} {ec_idle:<9.2f} " + f"{'---':<10} {'---':<10}") + + ec_history = [ec_idle] + rss_history = [rss_idle] + + for round_num in range(1, num_rounds + 1): + elapsed, success = run_bench(prompts_per_round) + + if not success: + print(f" Round {round_num} bench had errors, continuing...") + + time.sleep(3) + + rss_now = get_tree_total_rss_gb(server_pid) + ec_now = get_engine_core_rss_gb(server_pid) + ec_delta = ec_now - ec_idle + ec_round = ec_now - ec_history[-1] + ec_history.append(ec_now) + rss_history.append(rss_now) + + total_reqs = round_num * prompts_per_round + print(f"{round_num:<7} {total_reqs:<8} {rss_now:<11.2f} " + f"{ec_now:<9.2f} " + f"{'+' if ec_delta >= 0 else ''}{ec_delta:<9.2f} " + f"{'+' if ec_round >= 0 else ''}{ec_round:<9.2f} " + f"{elapsed:.0f}s" + f"{' (FAIL)' if not success else ''}") + + # Print final tree. + print_process_tree(server_pid, "final") + + # --- Summary --- + print() + print("=" * 70) + ec_growth = ec_history[-1] - ec_history[1] + ec_avg = ec_growth / max(num_rounds - 1, 1) + total_growth = rss_history[-1] - rss_history[1] + total_avg = total_growth / max(num_rounds - 1, 1) + + if ec_avg > 0.3 or total_avg > 0.5: + verdict = "\033[91mLEAK DETECTED\033[0m" + else: + verdict = "\033[92mMEMORY STABLE\033[0m" + + print(f"Result: {verdict}") + print(f" Branch: {branch} ({commit})") + print(f" Total RSS idle: {rss_idle:.2f} GB") + print(f" Total RSS after R1: {rss_history[1]:.2f} GB (warmup)") + print(f" Total RSS after R{num_rounds}: {rss_history[-1]:.2f} GB") + print(f" Total growth R2-{num_rounds}: {total_growth:+.2f} GB " + f"(avg {total_avg:+.2f} GB/round)") + print() + print(f" EngineCore RSS idle: {ec_idle:.2f} GB") + print(f" EngineCore after R1: {ec_history[1]:.2f} GB (warmup)") + print(f" EngineCore after R{num_rounds}: {ec_history[-1]:.2f} GB") + print(f" EngineCore growth: {ec_growth:+.2f} GB " + f"(avg {ec_avg:+.2f} GB/round)") + + if ec_avg > 0.3 or total_avg > 0.5: + print() + print(" The EngineCore subprocess RSS keeps growing.") + print(" This confirms the CPU memory leak from the Request") + print(" reference cycle (partial(block_hasher, self)).") + else: + print() + print(" Memory is stable after warmup - no leak.") + + print("=" * 70) + return 0 + + finally: + print("\nShutting down server...") + server_proc.terminate() + try: + server_proc.wait(timeout=15) + except subprocess.TimeoutExpired: + server_proc.kill() + server_proc.wait(timeout=5) + log_file.close() + print("Done.") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/v1/core/test_async_scheduler.py b/tests/v1/core/test_async_scheduler.py index e0645ed43015..8c09ec7b2b6a 100644 --- a/tests/v1/core/test_async_scheduler.py +++ b/tests/v1/core/test_async_scheduler.py @@ -236,7 +236,7 @@ def test_prefix_caching_for_multi_turn(): req._all_token_ids = req.prompt_token_ids.copy() req.all_token_ids = ConstantList(req._all_token_ids) req.block_hashes = [] - req.block_hashes = req.get_hash_new_full_blocks() + req.block_hashes = req._block_hasher(req) # Schedule the next-turn requests. for req in next_turn_requests: diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 90ca584410df..b807d051ccfa 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -984,8 +984,8 @@ def _update_request_as_session( session.prompt_token_ids.extend(update.prompt_token_ids or ()) # Update block hashes for the new tokens # (mirrors Request.append_output_token_ids) - if session.get_hash_new_full_blocks is not None: - session.block_hashes.extend(session.get_hash_new_full_blocks()) + if session._block_hasher is not None: + session.block_hashes.extend(session._block_hasher(session)) session.num_prompt_tokens = len(session.prompt_token_ids) session.arrival_time = update.arrival_time session.sampling_params = update.sampling_params diff --git a/vllm/v1/request.py b/vllm/v1/request.py index 3b829875f390..09f42edfed3f 100644 --- a/vllm/v1/request.py +++ b/vllm/v1/request.py @@ -6,7 +6,6 @@ from collections import deque from collections.abc import Callable, Mapping from dataclasses import dataclass -from functools import partial from typing import TYPE_CHECKING, Any import torch @@ -164,10 +163,12 @@ def __init__( self.num_external_computed_tokens = 0 self.block_hashes: list[BlockHash] = [] - self.get_hash_new_full_blocks: Callable[[], list[BlockHash]] | None = None - if block_hasher is not None: - self.get_hash_new_full_blocks = partial(block_hasher, self) - self.block_hashes = self.get_hash_new_full_blocks() + # Store the block hasher without binding self to avoid creating a + # reference cycle (Request -> partial -> Request) that prevents + # immediate garbage collection via reference counting. + self._block_hasher: Callable[["Request"], list[BlockHash]] | None = block_hasher + if self._block_hasher is not None: + self.block_hashes = self._block_hasher(self) self.skip_reading_prefix_cache = self.get_skip_reading_prefix_cache() @@ -212,8 +213,8 @@ def append_output_token_ids( self._output_token_ids.extend(token_ids) self._all_token_ids.extend(token_ids) - if self.get_hash_new_full_blocks is not None: - self.block_hashes.extend(self.get_hash_new_full_blocks()) + if self._block_hasher is not None: + self.block_hashes.extend(self._block_hasher(self)) @property def use_structured_output(self) -> bool: