Skip to content
25 changes: 25 additions & 0 deletions linex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ for line in profiler.source_lines[:5]:
print(f" {line.total_cycles:,} cycles ({line.stall_percent:.1f}% stalled)")
```

## Distributed Launchers

Linex supports distributed profiling with launchers like `torchrun`, `mpirun`,
`srun`, and `horovodrun`. Pass the launcher separately so Linex builds the
correct command order (`launcher rocprofv3 ... -- app`).

```python
profiler = Linex()
profiler.profile(
command="train.py",
launcher="torchrun --nproc_per_node=8",
output_dir="linex_sqtt",
)

print(profiler.distributed_context.global_rank)
for rank_key, rank_profile in profiler.rank_profiles.items():
print(rank_key, len(rank_profile.source_lines))
```

In distributed mode, Linex writes traces into rank-specific subdirectories
(`.../rank0000`, `.../rank0001`, ...) to avoid collisions. Rank metadata is
automatically detected from environment variables set by the launcher.

## What You Get

**Instruction-level metrics mapped to source lines:**
Expand Down Expand Up @@ -66,6 +89,8 @@ profiler = Linex(
**Properties:**
- `source_lines` - List[SourceLine] sorted by total_cycles
- `instructions` - List[InstructionData]
- `rank_profiles` - Per-rank profiling data for distributed runs
- `distributed_context` - Detected launcher/rank metadata

### SourceLine

Expand Down
4 changes: 2 additions & 2 deletions linex/src/linex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
providing cycle counts and performance metrics per source line.
"""

from .api import Linex, SourceLine, InstructionData
from .api import InstructionData, Linex, RankProfile, SourceLine

__version__ = "0.1.0"
__all__ = ["Linex", "SourceLine", "InstructionData"]
__all__ = ["Linex", "SourceLine", "InstructionData", "RankProfile"]
149 changes: 122 additions & 27 deletions linex/src/linex/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
"""

import json
import os
import subprocess
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Sequence

from .distributed import DistributedContext, detect_distributed_context, normalize_command_argv


@dataclass
Expand Down Expand Up @@ -99,6 +102,21 @@ def stall_percent(self) -> float:
return 100.0 * self.stall_cycles / self.total_cycles if self.total_cycles > 0 else 0.0


@dataclass
class RankProfile:
"""Per-rank trace data produced by a Linex profiling run."""

rank_key: str
global_rank: int
local_rank: int
world_size: int
hostname: str
launcher: str
ui_output_dir: str
source_lines: List[SourceLine]
instructions: List[InstructionData]


class Linex:
"""
Linex - Source-Level GPU Performance Profiler
Expand Down Expand Up @@ -143,6 +161,8 @@ def __init__(
# Data storage
self._instructions: List[InstructionData] = []
self._source_lines: Dict[str, SourceLine] = {}
self._rank_profiles: Dict[str, RankProfile] = {}
self._distributed_context: DistributedContext = DistributedContext()

def _ensure_decoder(self) -> Path:
"""Ensure decoder library is available, download if needed."""
Expand All @@ -158,10 +178,12 @@ def _ensure_decoder(self) -> Path:

def profile(
self,
command: str,
command: str | Sequence[str],
output_dir: Optional[str] = None,
kernel_filter: Optional[str] = None,
force_cu_mask: bool = True,
env: Optional[Dict[str, str]] = None,
launcher: Optional[str | Sequence[str]] = None,
) -> "Linex":
"""
Profile an application and collect source-level performance data.
Expand All @@ -177,15 +199,39 @@ def profile(
kernel_filter: Regex filter for kernel names (default: None = all kernels)
force_cu_mask: Force waves to target CU using HSA_CU_MASK (default: True)

Note:
When using ``launcher``, the launcher spawns child processes that
each run rocprofv3. Rank metadata is detected from environment
variables (RANK, LOCAL_RANK, WORLD_SIZE, etc.) set by the launcher
in each child process. This means ``launcher`` is most useful when
Linex itself runs *inside* a launched process (e.g.,
``torchrun --no-python ... linex-cli ...``), not when calling
``Linex().profile(launcher=...)`` from a non-distributed parent.

Returns:
self for chaining
"""
import os
import tempfile

# Use temp directory if not specified
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="linex_")
base_output_dir = Path(tempfile.mkdtemp(prefix="linex_"))
else:
base_output_dir = Path(output_dir)

run_env = os.environ.copy()
if env:
run_env.update(env)

dist_context = detect_distributed_context(run_env)
self._distributed_context = dist_context

output_path = base_output_dir
if dist_context.is_distributed:
output_path = base_output_dir / dist_context.rank_tag
output_path.mkdir(parents=True, exist_ok=True)

command_argv = normalize_command_argv(command)

cmd = [
"rocprofv3",
Expand All @@ -199,37 +245,67 @@ def profile(
"--att-shader-engine-mask",
self.shader_engine_mask,
"-d",
output_dir,
str(output_path),
]

if kernel_filter:
cmd.extend(["--kernel-include-regex", kernel_filter])

cmd.extend(["--", *command.split()])
cmd.extend(["--", *command_argv])

env = os.environ.copy()
if force_cu_mask:
env["HSA_CU_MASK"] = "0x1" # Force to CU 0
# If a launcher is specified, prepend it: launcher rocprofv3 ... -- app
if launcher is not None:
launcher_argv = normalize_command_argv(launcher)
cmd = launcher_argv + cmd

result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if force_cu_mask and "HSA_CU_MASK" not in run_env:
run_env["HSA_CU_MASK"] = "0x1" # Force to CU 0 unless caller already set a mask

result = subprocess.run(cmd, env=run_env, capture_output=True, text=True)

if result.returncode != 0:
# Include stderr in error message for debugging
raise RuntimeError(f"rocprofv3 failed with code {result.returncode}\n{result.stderr}")

# Find generated ui_output directory
output_path = Path(output_dir)
ui_dirs = list(output_path.glob("ui_output_*"))
ui_dirs = sorted(output_path.glob("ui_output_*"), key=lambda p: p.name)

if not ui_dirs:
raise RuntimeError(f"No ui_output directories found in {output_dir}")
raise RuntimeError(f"No ui_output directories found in {output_path}")

self._rank_profiles = {}
for idx, ui_dir in enumerate(ui_dirs):
instructions, source_lines = self._load_ui_output_data(ui_dir)
if idx == 0:
rank_key = dist_context.rank_tag
else:
rank_key = f"{dist_context.rank_tag}_dispatch{idx + 1:03d}"
self._rank_profiles[rank_key] = RankProfile(
rank_key=rank_key,
global_rank=dist_context.global_rank,
local_rank=dist_context.local_rank,
world_size=dist_context.world_size,
hostname=dist_context.hostname,
launcher=dist_context.launcher,
ui_output_dir=str(ui_dir),
source_lines=sorted(
source_lines.values(), key=lambda x: x.total_cycles, reverse=True
),
instructions=instructions,
)

# Load the first dispatch
self._load_ui_output(ui_dirs[0])
# Preserve existing API behavior by exposing the first rank profile as top-level fields.
primary_rank = next(iter(self._rank_profiles.values()))
self._instructions = primary_rank.instructions
self._source_lines = {
line.source_location: line for line in primary_rank.source_lines if line.source_location
}
return self

def _load_ui_output(self, ui_output_dir: Path) -> None:
"""Internal: Load performance trace from ui_output directory."""
def _load_ui_output_data(
self, ui_output_dir: Path
) -> tuple[List[InstructionData], Dict[str, SourceLine]]:
"""Internal: Load performance trace data from ui_output directory."""
code_file = ui_output_dir / "code.json"

if not code_file.exists():
Expand All @@ -244,7 +320,7 @@ def _load_ui_output(self, ui_output_dir: Path) -> None:
)

# Parse instructions
self._instructions = []
instructions: List[InstructionData] = []
for entry in data["code"]:
inst = InstructionData(
isa=entry[0],
Expand All @@ -257,21 +333,27 @@ def _load_ui_output(self, ui_output_dir: Path) -> None:
stall_cycles=entry[8],
idle_cycles=entry[9],
)
self._instructions.append(inst)
instructions.append(inst)

# Aggregate by source line
self._aggregate_source_lines()
return instructions, self._aggregate_source_lines(instructions)

def _load_ui_output(self, ui_output_dir: Path) -> None:
"""Backward-compatible loader for a single ui_output directory."""
instructions, source_lines = self._load_ui_output_data(ui_output_dir)
self._instructions = instructions
self._source_lines = source_lines

def _aggregate_source_lines(self):
def _aggregate_source_lines(self, instructions: Optional[List[InstructionData]] = None):
"""Aggregate instruction data by source line."""
self._source_lines = {}
source_lines: Dict[str, SourceLine] = {}
instructions_to_aggregate = self._instructions if instructions is None else instructions

for inst in self._instructions:
for inst in instructions_to_aggregate:
source = inst.source_location
if not source or source.startswith(";"):
continue

if source not in self._source_lines:
if source not in source_lines:
# Parse file:line from source
if ":" in source:
parts = source.rsplit(":", 1)
Expand All @@ -285,7 +367,7 @@ def _aggregate_source_lines(self):
file = source
line = 0

self._source_lines[source] = SourceLine(
source_lines[source] = SourceLine(
file=file,
line_number=line,
source_location=source,
Expand All @@ -296,12 +378,15 @@ def _aggregate_source_lines(self):
instructions=[],
)

sl = self._source_lines[source]
sl = source_lines[source]
sl.execution_count += inst.execution_count
sl.total_cycles += inst.latency_cycles
sl.stall_cycles += inst.stall_cycles
sl.idle_cycles += inst.idle_cycles
sl.instructions.append(inst)
if instructions is None:
self._source_lines = source_lines
return source_lines

@property
def source_lines(self) -> List[SourceLine]:
Expand All @@ -312,3 +397,13 @@ def source_lines(self) -> List[SourceLine]:
def instructions(self) -> List[InstructionData]:
"""Get all instructions."""
return self._instructions

@property
def rank_profiles(self) -> Dict[str, RankProfile]:
"""Get per-rank profiles for distributed runs."""
return self._rank_profiles

@property
def distributed_context(self) -> DistributedContext:
"""Get distributed runtime metadata detected for this profile run."""
return self._distributed_context
Loading
Loading