Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .coverage
Binary file not shown.
1,677 changes: 870 additions & 807 deletions coverage.xml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions previous_profile.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions profile.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "slopometry"
version = "20260121-2"
version = "20260125-2"
description = "Opinionated code quality metrics for code agents and humans"
readme = "README.md"
requires-python = ">=3.13"
Expand Down Expand Up @@ -125,3 +125,6 @@ precision = 2

[tool.coverage.html]
directory = "htmlcov"

[tool.pyrefly]
search_path = ["src", "tests"]
12 changes: 7 additions & 5 deletions src/slopometry/core/complexity_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ def _analyze_directory(self, directory: Path) -> ComplexityMetrics:

encoder = _get_tiktoken_encoder()

files_by_complexity = {}
all_complexities = []
files_by_complexity: dict[str, int] = {}
all_complexities: list[int] = []

files_by_token_count = {}
all_token_counts = []
files_by_token_count: dict[str, int] = {}
all_token_counts: list[int] = []

for file_path in python_files:
if not file_path.exists():
Expand Down Expand Up @@ -313,7 +313,9 @@ def _calculate_delta(
if isinstance(current_metrics, ExtendedComplexityMetrics) and isinstance(
baseline_metrics, ExtendedComplexityMetrics
):
common_effort_files = set(baseline_metrics.files_by_effort.keys()) & set(current_metrics.files_by_effort.keys())
common_effort_files = set(baseline_metrics.files_by_effort.keys()) & set(
current_metrics.files_by_effort.keys()
)
delta.files_effort_changed = {
file_path: current_metrics.files_by_effort[file_path] - baseline_metrics.files_by_effort[file_path]
for file_path in common_effort_files
Expand Down
90 changes: 81 additions & 9 deletions src/slopometry/core/context_coverage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ def __init__(self, working_directory: Path):
self.working_directory = working_directory
self._import_graph: dict[str, set[str]] = {}
self._reverse_import_graph: dict[str, set[str]] = {}
self._tracked_files: list[Path] | None = None

def _get_tracked_files(self) -> list[Path]:
"""Get tracked Python files (cached)."""
if self._tracked_files is None:
tracker = GitTracker(self.working_directory)
self._tracked_files = tracker.get_tracked_python_files()
return self._tracked_files

def analyze_transcript(self, transcript_path: Path) -> ContextCoverage:
"""Analyze a session transcript to compute context coverage.
Expand Down Expand Up @@ -58,24 +66,86 @@ def analyze_transcript(self, transcript_path: Path) -> ContextCoverage:
def get_affected_dependents(self, changed_files: set[str]) -> list[str]:
"""Identify files that depend on the changed files (potential blind spots).

Uses grep-based search instead of building full import graph for performance.

Args:
changed_files: Set of relative file paths that were modified

Returns:
List of unique file paths that import the changed files
"""
self._build_import_graph()
affected = set()

for file_path in changed_files:
dependents = self._reverse_import_graph.get(file_path, set())
dependents = self._find_dependents_fast(file_path)
affected.update(dependents)

tests = self._find_test_files(file_path)
affected.update(tests)

return sorted(list(affected - changed_files))

def _find_dependents_fast(self, file_path: str) -> set[str]:
"""Find files that import the given file using grep (fast).

Instead of parsing all files to build import graph, grep for import patterns.
"""
import subprocess

module_patterns = self._get_import_patterns(file_path)
if not module_patterns:
return set()

dependents = set()

for pattern in module_patterns:
try:
result = subprocess.run(
["git", "grep", "-l", "-E", pattern],
cwd=self.working_directory,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
if line.endswith(".py") and line != file_path:
dependents.add(line)
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
pass

return dependents

def _get_import_patterns(self, file_path: str) -> list[str]:
"""Convert a file path to regex patterns that would import it."""
path = Path(file_path)
patterns = []

if path.name == "__init__.py":
module_parts = list(path.parent.parts)
else:
module_parts = list(path.parent.parts) + [path.stem]

if not module_parts or module_parts == ["."]:
return []

if module_parts[0] == "src":
module_parts = module_parts[1:]

if not module_parts:
return []

module_name = ".".join(module_parts)

patterns.append(f"^(from|import)\\s+{module_name.replace('.', r'\\.')}(\\s|$|,)")

if len(module_parts) > 1:
parent_module = ".".join(module_parts[:-1])
last_part = module_parts[-1]
patterns.append(f"^from\\s+{parent_module.replace('.', r'\\.')}\\s+import\\s+.*{last_part}")

return patterns

def _extract_file_events(self, transcript_path: Path) -> tuple[set[str], set[str], dict[str, int], dict[str, int]]:
"""Extract Read and Edit file paths from transcript with their sequence numbers.

Expand Down Expand Up @@ -173,9 +243,14 @@ def _to_relative_path(self, file_path: str) -> str | None:
return None

def _build_import_graph(self) -> None:
"""Build import graph for all Python files in working directory."""
tracker = GitTracker(self.working_directory)
python_files = tracker.get_tracked_python_files()
"""Build import graph for all Python files in working directory.

Skips if already built (cached in instance).
"""
if self._import_graph:
return

python_files = self._get_tracked_files()

for file_path in python_files:
try:
Expand Down Expand Up @@ -278,11 +353,8 @@ def _find_test_files(self, source_file: str) -> list[str]:
f"test_{source_name}.py",
]

tracker = GitTracker(self.working_directory)
tracked_files = tracker.get_tracked_python_files()

test_files = []
for file_path in tracked_files:
for file_path in self._get_tracked_files():
try:
rel_path = str(file_path.relative_to(self.working_directory))
except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion src/slopometry/core/coverage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _parse_coverage_db(self, db_path: Path) -> CoverageResult:
error_message=f"Error reading .coverage: {e}",
)
finally:
if cov is not None:
if cov is not None and cov._data is not None:
try:
cov._data.close()
except (AttributeError, TypeError):
Expand Down
115 changes: 75 additions & 40 deletions src/slopometry/core/git_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,46 +186,64 @@ def get_python_files_from_commit(self, commit_ref: str = "HEAD~1") -> list[str]:
def get_tracked_python_files(self) -> list[Path]:
"""Get list of Python files tracked by git or not ignored (if untracked).

Uses git ls-files if available, otherwise falls back to rglob with exclusion.
For non-git directories (e.g., temp extraction dirs), falls back to
finding all Python files while excluding common virtual env directories.

Returns:
List of Path objects for Python files
"""

Raises:
GitOperationError: If inside a git repo but git command fails
"""
try:
cmd = ["git", "ls-files", "--cached", "--others", "--exclude-standard"]
result = subprocess.run(
cmd,
cwd=self.working_dir,
capture_output=True,
text=True,
check=True,
timeout=30,
)
files = []
for line in result.stdout.splitlines():
if line.endswith(".py"):
files.append(self.working_dir / line)
return files

except (subprocess.SubprocessError, FileNotFoundError):
pass
if result.returncode == 0:
files = []
for line in result.stdout.splitlines():
if line.endswith(".py"):
files.append(self.working_dir / line)
return files

files = []
# Check if this is a "not a git repo" error vs actual failure
stderr = result.stderr.strip().lower()
if "not a git repository" in stderr:
# Not a git repo - fall back to finding Python files directly
return self._find_python_files_fallback()

# Actual git failure in a git repo
raise GitOperationError(f"git ls-files failed: {result.stderr.strip()}")

except subprocess.TimeoutExpired as e:
raise GitOperationError(f"git ls-files timed out: {e}") from e
except FileNotFoundError as e:
raise GitOperationError(f"git not found - is git installed? {e}") from e
except subprocess.SubprocessError as e:
raise GitOperationError(f"git ls-files failed: {e}") from e

def _find_python_files_fallback(self) -> list[Path]:
"""Find Python files without git (for non-git directories like temp extractions)."""
ignored_dirs = {
".venv",
"venv",
"env",
".env",
".git",
".idea",
".vscode",
"__pycache__",
"node_modules",
"site-packages",
"dist",
"build",
}

files = []
for file_path in self.working_dir.rglob("*.py"):
parts = file_path.relative_to(self.working_dir).parts
if any(part in ignored_dirs for part in parts):
Expand Down Expand Up @@ -372,6 +390,9 @@ def get_changed_python_files(self, parent_sha: str, child_sha: str) -> list[str]
def extract_specific_files_from_commit(self, commit_ref: str, file_paths: list[str]) -> Path | None:
"""Extract specific files from a commit to a temporary directory.

Uses git archive with pathspec for batch extraction (single subprocess call)
instead of per-file git show calls.

Args:
commit_ref: Git commit reference
file_paths: List of file paths to extract
Expand All @@ -385,41 +406,55 @@ def extract_specific_files_from_commit(self, commit_ref: str, file_paths: list[s
if not file_paths:
return None

temp_dir: Path | None = None
try:
temp_dir = Path(tempfile.mkdtemp(prefix="slopometry_delta_"))
failed_files: list[str] = []

for file_path in file_paths:
try:
result = subprocess.run(
["git", "show", f"{commit_ref}:{file_path}"],
cwd=self.working_dir,
capture_output=True,
timeout=10,
)

if result.returncode == 0:
dest_path = temp_dir / file_path
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(result.stdout)
else:
failed_files.append(file_path)
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
failed_files.append(file_path)

# Don't error on files that don't exist in this commit
# (e.g., newly added files when extracting from parent commit)

# Use git archive with pathspec to extract only the specified files
# This is O(1) subprocess calls instead of O(n) with git show
result = subprocess.run(
["git", "archive", "--format=tar", commit_ref, "--"] + file_paths,
cwd=self.working_dir,
capture_output=True,
timeout=60,
)

if result.returncode != 0:
# git archive fails if none of the files exist in this commit
# This is normal for newly added files when extracting from parent
stderr = result.stderr.decode().strip()
if "pathspec" in stderr.lower() or "not in" in stderr.lower():
shutil.rmtree(temp_dir, ignore_errors=True)
return None
raise GitOperationError(f"git archive failed for {commit_ref}: {stderr}")

from io import BytesIO

tar_data = BytesIO(result.stdout)
try:
with tarfile.open(fileobj=tar_data, mode="r") as tar:
python_members = [m for m in tar.getmembers() if m.name.endswith(".py")]
if not python_members:
shutil.rmtree(temp_dir, ignore_errors=True)
return None
tar.extractall(path=temp_dir, members=python_members, filter="data")
except tarfile.TarError as e:
shutil.rmtree(temp_dir, ignore_errors=True)
raise GitOperationError(f"Failed to extract tar for {commit_ref}: {e}") from e

if not any(temp_dir.rglob("*.py")):
if failed_files and len(failed_files) == len(file_paths):
raise GitOperationError(
f"Failed to extract any files from {commit_ref}. "
f"These files may not exist in this commit. Failed: {failed_files}"
)
shutil.rmtree(temp_dir, ignore_errors=True)
return None

return temp_dir

except subprocess.TimeoutExpired as e:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
raise GitOperationError(f"git archive timed out for {commit_ref}: {e}") from e
except (subprocess.SubprocessError, OSError) as e:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
raise GitOperationError(f"Failed to extract files from {commit_ref}: {e}") from e

def has_previous_commit(self) -> bool:
Expand Down
4 changes: 3 additions & 1 deletion src/slopometry/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ class ComplexityDelta(BaseModel):
files_added: list[str] = Field(default_factory=list)
files_removed: list[str] = Field(default_factory=list)
files_changed: dict[str, int] = Field(default_factory=dict, description="Mapping of filename to complexity delta")
files_effort_changed: dict[str, float] = Field(default_factory=dict, description="Mapping of filename to effort delta")
files_effort_changed: dict[str, float] = Field(
default_factory=dict, description="Mapping of filename to effort delta"
)
net_files_change: int = Field(default=0, description="Net change in number of files (files_added - files_removed)")
avg_complexity_change: float = 0.0

Expand Down
Loading