diff --git a/.coverage b/.coverage index f6286a8..2b9fbb7 100644 Binary files a/.coverage and b/.coverage differ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0e3188d..5f8d9e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,7 @@ jobs: |--------|-------|-------------| | MI (normalized) | ${qpe.mi_normalized.toFixed(3)} | Maintainability Index / 100 | | Smell Penalty | ${qpe.smell_penalty.toFixed(3)} | Weighted code smell deduction | - | Adjusted Quality | ${qpe.adjusted_quality.toFixed(3)} | MI × (1 - smell_penalty) | + | Adjusted Quality | ${qpe.adjusted_quality.toFixed(3)} | MI × (1 - smell_penalty) + bonuses | | Effort Factor | ${qpe.effort_factor.toFixed(2)} | log(Halstead Effort + 1) |
diff --git a/README.md b/README.md index 6268e51..77c93d4 100644 --- a/README.md +++ b/README.md @@ -104,9 +104,11 @@ And many more undocumented features. Which worked just fine on my machine at som # Limitations -Runtime: Almost all metrics are trend-relative and the first run will do a long code analysis before caching, but if you consider using this tool, you are comfortable with waiting for agents anyway. +**Runtime**: Almost all metrics are trend-relative and the first run will do a long code analysis before caching, but if you consider using this tool, you are comfortable with waiting for agents anyway. -Compat: This software was tested mainly on Linux with Python codebases. There are plans to support Rust at some point but not any kind of cursed C++ or other unserious languages like that. I heard someone ran it on MacOS successfully once but we met the person on twitter, so YMMV. +**Git**: This tool requires git to be installed and available in PATH. Most features (baseline comparison, commit analysis, cache invalidation) depend on git operations. + +**Compat**: This software was tested mainly on Linux with Python codebases. There are plans to support Rust at some point but not any kind of cursed C++ or other unserious languages like that. I heard someone ran it on MacOS successfully once but we met the person on twitter, so YMMV. Seriously, please do not open PRs with support for any kind of unserious languages. Just fork and pretend you made it. We are ok with that. Thank you. diff --git a/coverage.xml b/coverage.xml index 314758b..4b373e8 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -16,9 +16,9 @@ - + - + @@ -79,12 +79,26 @@ - - - - - - + + + + + + + + + + + + + + + + + + + + @@ -218,7 +232,7 @@ - + @@ -298,179 +312,197 @@ - + - - + + - + - + - + - - - + + + - - + + - + - - + + - + - + - + - - - + + + - + - + - + - + - + - - - + + + - + - + - - - + + + - - + + - + + - + + + - - - - - - - - + + + + + + + + + + + - - - - - - - - - - + + + - - - - + + + - - + + - + + - - - - + + + + - - - - - - - - + - - + + - + + + - - + - - - + + - + + - + + - + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -640,7 +672,7 @@ - + @@ -1499,7 +1531,7 @@ - + @@ -1751,175 +1783,177 @@ + + - - + - + - - - + + + + - + + - - - - + - - + + + - - + + + - - - - - - - - - - + + + + + + + - - - + + + + + + - + - + - - + - - + + - + - - - - - - - - - + + + + + + + - - - + + + + + + - + - - + - + - + - - - - - - - - + + - - - - - + + + + + - + + + + + + + - - + + + + - @@ -1930,32 +1964,23 @@ - - - - - - - - - + + + + + + + + + + - - - - + + - - - - - - - - - - + + @@ -2302,7 +2327,7 @@ - + @@ -2312,152 +2337,60 @@ + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - + + - - - + - - - - + + - - + + + + + + + + + @@ -2465,361 +2398,335 @@ + + + + - + - - - + + - - + + - - - - + + + + - + + + + + - - + + - + - - - - + + - - - - - - - + + + + + - + + - + + + + + + + - - - - - - - - - - + + + - - + + - - + + + - - + + - - + + + - - - - - - - + + + - + + + - + + + + + + + - - - - - - + + + - + + + + + + + + - + + + + + + + + + + + + + + + + + + + - + + + + - - + + + - - - - - - + + + - - - - - + + - - - + + + - - - - + + - - - + + + + + + - - + + - + - - - + - - - - - + + + + + - + - - + - + - - - - + + + - - - - + + - - - - - - - + + - - - - - - - - - + - - - - - - - - - - + + - - - - - + - - - - - - - + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + - - - - - + @@ -2828,131 +2735,320 @@ - - + + + - - + + - - + - + + - + + + + + + + + - - - + + + + + - - + + + + + - - + + + + + - - - - + + + + - - - - + + + - - - - + + + + + + - + + + - - - + - - + - - + + - - - - + + + - - + + + - - - + + - + - - - - + - - - - + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -3059,6 +3155,66 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -3100,7 +3256,7 @@ - + @@ -3118,6 +3274,7 @@ + @@ -3128,282 +3285,251 @@ - - - - - - - - - - - - + + + + + + + + + + + - - - - - - - + + + - - - + + + + - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - + + + + + + + + + + + + + + - - - + + + + + + - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + - + - - - - - + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + - - - + + - - - - - - - - - - - - - - - - - + - + + + + - + + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - + + - - - - - + + + - + - - + + + - - + - + + - + @@ -3412,21 +3538,20 @@ - - + + - + + - - - - - + + + + - - - + + @@ -3434,110 +3559,183 @@ - - - - - - + + + + + + + + - - + - + - + - - - - - + + + + + - + - - - + + + - - - + + - + - - - - - - - - + + + + + + + + + - + - + + - + + - - - + + + + - - + + + - + - - - - + + + + - - + + + - + + - - + + - + + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -3606,47 +3804,58 @@ - - - - + + - - - - - - + + - - - - - + - - - - - + + + + + - + - - + + + - + + + + + + + + + + + + + + + + + + + + + @@ -3883,503 +4092,509 @@ - + - + + + + + + + + - - - - - - - - - - - - + + + + + + - + + - - - - - - - - - - - + + + + + + + + + + - + - + - + + - - - - - - + + + + + - + - + - + - + + - - - - - - + + + + + - + - + + + + - - - - + - + - + - + - + - + - - + + + - - + - + + + - - - + - - - - - - - + + + + + + - + - - - - - - - - + + + + + + + + + - + + + - - - - - - + + + + - + - - - - - - + + + + + + - + - - + + - + - + + + + + - - - - + + - - + - + + + + - - + - + - + - + + + + - + - - + + + + + + - - + + - + + - - - - - + + + + + - - - - + - - - - - - + + + + + + + + + + + - + + + + + + + + - - - - - - - - - + - - - + - - - + + - - - - - - - - - - + + + + + + + + + - + - - - - - - + + + + + - - - - - - - + - + + + + + + + + + + + + - - + + - - - - - - - - - - + - - + + + + + + + + - - - - - - - - + + + + + + + + - - + + + + + - + - + + + - + + + - + - - - - - - + + + - - - + + + - + + - - - - - - + - - - - - + + + + + - - - - - - + + + + + + - - - - + + + + - - + + + + - - - - - - + + + + - + + + - - - - - - + + + + + - - - - - - - + + + - + + + + - + + + - + - - - + + - - - - - @@ -4389,75 +4604,82 @@ - - - - - + + + + + - - - - - - - + + - + + + + - - - - - - + + + + + + + + + + + - - - + + + + + + + + + + + + + + - - - - - - - - - - - - - + + + + - - - - - - - - + + + + + + + + + - - + + + @@ -4465,55 +4687,59 @@ - + + - - - + + - - - - + + - + + + + + + + + - - + - - - - - - - - - + - + - + + + + + + + + + @@ -4522,164 +4748,129 @@ - - + + - + - - - - - - - + + + - - - - - - - - - + + + + - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - - + @@ -4745,43 +4936,42 @@ - - + + - - + + - + - + - - - - - - - - - - - - - + + + + + + + + + + + + + + - - + - - - + + @@ -4789,141 +4979,137 @@ + + + + + - - + + + + + - + - - - + - - - - - - + - - - - - + + + + + + + - + - + - - + - - + - - - - - - - - - + + + + + - - + - - - + + + + + + - + + + - + - + - - - - - - - - + + + + + - - - + - + - - + - - + - - - - + + + + + + @@ -4931,25 +5117,27 @@ - + - + + + - - + + + + - - - - + + @@ -4959,119 +5147,148 @@ + - + - - - - - - - + + + + + + - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + - - + - + - + - - + - - - - + - - - - - + + + - - - + + - + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - + - - + + + - + - - + + - - + - - - + + + - @@ -5087,10 +5304,10 @@ + - + - @@ -5102,115 +5319,119 @@ + - + - - + + + - - + - + + - - - - + + + - - - - - - - - + + + + + + + - + + - - + + - + + - - - - + + + - - - - - + + + + + - - + + - + + + - - - + - - + + - + + - - - - - + + + + - + - + - + + + - - - + - - - + + + + - - - + - + + + - - + + + + @@ -5251,9 +5472,9 @@ - + - + @@ -5263,8 +5484,7 @@ - - + @@ -5420,570 +5640,597 @@ - - + + - - + - + - + + - + - - + + + + - - - + + - + - + - + - - - + + + + - + - + - + - - - + + + + - - - + + - + - - - - - + + + + - - - - - - + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + - - - - + + + + + + + + - + - + + + - - - - - - - - + + + + + + + - + - - - + - - - - - - - - + + + + + + + - - + - + + + + - - - + + + + - + + - - - + - - - - + + + + + + + + + + - + - - - - - - - - - - - - - + + + + + + - - - - + + + + - + - + - + - - - - - + + + + + - + + - - + - + - + - + - - - - - - + + + + + - - + + - - - - + + + + + - - + + - - - - - - - - - + + + - + + + - - - + + + + + + + - + - + + - + - + - - - + + + - - + + - - - - - - - + + + + - - + - - + + - + - - - - - + + + + + + + + - - + + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + - + + - + - + + - - - - + + + + + + + + + + + + + + + + + + - + - - - - - - - - - - - - - - - - + + + + - + + + + + - + + + + + + + - + + - - - - - - - + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - + + + + - + + + + + + - + - + + + + - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - + - + @@ -5999,142 +6246,150 @@ - - + + - + - + - + - - - + + + + - + - + - + - - - - - - - + + + + + + + + + - - - - - - + + + + - - + + - - - - - - + + + + + + - - + + + - - - - - + + + + + - - - - - + + + + + + + + - - - - - - - + - - - - - - - + + + + + + + + + + - - - - - - - - - - - + + + + + + + + + - - + - - - - + + + + + + + + + + + + - + - - - - - - + - + + + + + + + @@ -6172,7 +6427,7 @@ - + @@ -6181,70 +6436,126 @@ - + - - - + + + - - + + - + - - - + + + - + - + - + - - - - - + + + + + - + - + - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -6254,7 +6565,7 @@ - + @@ -6336,122 +6647,123 @@ - - - - - + + + + + + - + - - + - + + - - + + - - - + + + - - + + - + - - + + - - + + - - + + - - + + - - + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - + @@ -6469,34 +6781,32 @@ - - - - - - + + + + + + - - - - - - - - - - - - - - - + + + + + + + + + + + + + @@ -6589,7 +6899,7 @@ - + @@ -6602,28 +6912,32 @@ + + - - - + + - - - - - - - - - + + + + + + + + + + - - - - - - - + + + + + + + + + @@ -6830,131 +7144,142 @@ - + - - - + + + + + - - + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + - + - + - - - + + + + - - + + - + - - + + - + - + - - + + - - - + + - - + + + - - + + - - - - + + + + - - - + + + - - diff --git a/pyproject.toml b/pyproject.toml index e6762e7..d77014d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "slopometry" -version = "20260114-1" +version = "20260117-1" description = "Opinionated code quality metrics for code agents and humans" readme = "README.md" requires-python = ">=3.13" diff --git a/src/slopometry/cli.py b/src/slopometry/cli.py index 0e21fac..a14050e 100644 --- a/src/slopometry/cli.py +++ b/src/slopometry/cli.py @@ -4,9 +4,8 @@ import sys import click -from rich.console import Console -console = Console() +from slopometry.display.console import console def check_slopometry_in_path() -> bool: diff --git a/src/slopometry/core/code_quality_cache.py b/src/slopometry/core/code_quality_cache.py index c876d40..6d2b02d 100644 --- a/src/slopometry/core/code_quality_cache.py +++ b/src/slopometry/core/code_quality_cache.py @@ -211,6 +211,45 @@ def cleanup_old_cache_entries(self, days_old: int = 30) -> int: except sqlite3.Error: return 0 + def update_cached_coverage( + self, + session_id: str, + test_coverage_percent: float, + test_coverage_source: str, + ) -> bool: + """Update test coverage fields in cached metrics for a session. + + Args: + session_id: Session ID to update + test_coverage_percent: Coverage percentage to store + test_coverage_source: Source file path (e.g., coverage.xml) + + Returns: + True if successfully updated, False otherwise + """ + try: + cursor = self.db_connection.execute( + "SELECT complexity_metrics_json FROM code_quality_cache WHERE session_id = ?", + (session_id,), + ) + row = cursor.fetchone() + if not row: + return False + + metrics_data = json.loads(row[0]) + metrics_data["test_coverage_percent"] = test_coverage_percent + metrics_data["test_coverage_source"] = test_coverage_source + + self.db_connection.execute( + "UPDATE code_quality_cache SET complexity_metrics_json = ? WHERE session_id = ?", + (json.dumps(metrics_data), session_id), + ) + self.db_connection.commit() + return True + + except (sqlite3.Error, json.JSONDecodeError, Exception): + return False + def get_cache_statistics(self) -> dict[str, int]: """Get statistics about the cache. diff --git a/src/slopometry/core/complexity_analyzer.py b/src/slopometry/core/complexity_analyzer.py index 8d68f7b..6a92b83 100644 --- a/src/slopometry/core/complexity_analyzer.py +++ b/src/slopometry/core/complexity_analyzer.py @@ -14,7 +14,7 @@ ExtendedComplexityMetrics, FileAnalysisResult, ) -from slopometry.core.python_feature_analyzer import PythonFeatureAnalyzer +from slopometry.core.python_feature_analyzer import PythonFeatureAnalyzer, _count_loc from slopometry.core.settings import settings logger = logging.getLogger(__name__) @@ -172,7 +172,8 @@ def _analyze_directory(self, directory: Path) -> ComplexityMetrics: files_by_token_count[relative_path] = token_count all_token_counts.append(token_count) - except (SyntaxError, UnicodeDecodeError, OSError): + except (SyntaxError, UnicodeDecodeError, OSError) as e: + logger.debug(f"Skipping unparseable file {file_path}: {e}") continue total_files = len(all_complexities) @@ -349,9 +350,39 @@ def _calculate_delta( delta.dynamic_execution_change = ( current_metrics.dynamic_execution_count - baseline_metrics.dynamic_execution_count ) + delta.single_method_class_change = ( + current_metrics.single_method_class_count - baseline_metrics.single_method_class_count + ) + delta.deep_inheritance_change = ( + current_metrics.deep_inheritance_count - baseline_metrics.deep_inheritance_count + ) + delta.passthrough_wrapper_change = ( + current_metrics.passthrough_wrapper_count - baseline_metrics.passthrough_wrapper_count + ) return delta + def _build_files_by_loc(self, python_files: list[Path], target_dir: Path) -> dict[str, int]: + """Build mapping of file path to code LOC for file filtering. + + Args: + python_files: List of Python files to analyze + target_dir: Target directory for relative path calculation + + Returns: + Dict mapping relative file paths to their code LOC + """ + files_by_loc: dict[str, int] = {} + for file_path in python_files: + try: + content = file_path.read_text(encoding="utf-8") + _, code_loc = _count_loc(content) + relative_path = self._get_relative_path(file_path, target_dir) + files_by_loc[relative_path] = code_loc + except (OSError, UnicodeDecodeError): + continue + return files_by_loc + def _get_relative_path(self, file_path: str | Path, reference_dir: Path | None = None) -> str: """Convert absolute path to relative path from reference directory. @@ -428,6 +459,7 @@ def analyze_extended_complexity(self, directory: Path | None = None) -> Extended results = [_analyze_single_file_extended(fp) for fp in python_files] files_by_complexity: dict[str, int] = {} + files_by_effort: dict[str, float] = {} all_complexities: list[int] = [] files_with_parse_errors: dict[str, str] = {} files_by_token_count: dict[str, int] = {} @@ -451,6 +483,7 @@ def analyze_extended_complexity(self, directory: Path | None = None) -> Extended continue files_by_complexity[relative_path] = result.complexity + files_by_effort[relative_path] = result.effort all_complexities.append(result.complexity) total_volume += result.volume @@ -527,6 +560,7 @@ def analyze_extended_complexity(self, directory: Path | None = None) -> Extended str_type_percentage=str_type_percentage, total_files_analyzed=total_files, files_by_complexity=files_by_complexity, + files_by_effort=files_by_effort, files_with_parse_errors=files_with_parse_errors, orphan_comment_count=feature_stats.orphan_comment_count, untracked_todo_count=feature_stats.untracked_todo_count, @@ -564,4 +598,22 @@ def analyze_extended_complexity(self, directory: Path | None = None) -> Extended dynamic_execution_files=sorted( [self._get_relative_path(p, target_dir) for p in feature_stats.dynamic_execution_files] ), + single_method_class_count=feature_stats.single_method_class_count, + deep_inheritance_count=feature_stats.deep_inheritance_count, + passthrough_wrapper_count=feature_stats.passthrough_wrapper_count, + single_method_class_files=sorted( + [self._get_relative_path(p, target_dir) for p in feature_stats.single_method_class_files] + ), + deep_inheritance_files=sorted( + [self._get_relative_path(p, target_dir) for p in feature_stats.deep_inheritance_files] + ), + passthrough_wrapper_files=sorted( + [self._get_relative_path(p, target_dir) for p in feature_stats.passthrough_wrapper_files] + ), + total_loc=feature_stats.total_loc, + code_loc=feature_stats.code_loc, + files_by_loc={ + self._get_relative_path(p, target_dir): loc + for p, loc in self._build_files_by_loc(python_files, target_dir).items() + }, ) diff --git a/src/slopometry/core/hook_handler.py b/src/slopometry/core/hook_handler.py index 86374ce..92cad87 100644 --- a/src/slopometry/core/hook_handler.py +++ b/src/slopometry/core/hook_handler.py @@ -585,7 +585,7 @@ def format_code_smell_feedback( session_id: str | None = None, working_directory: str | None = None, ) -> tuple[str, bool, bool]: - """Format code smell feedback by iterating over SmellField-marked fields. + """Format code smell feedback using get_smells() for direct field access. Args: current_metrics: Current complexity metrics with code smell counts @@ -599,7 +599,7 @@ def format_code_smell_feedback( - has_smells: whether any code smells were detected - has_blocking_smells: whether any BLOCKING smells in edited files were detected """ - blocking_fields = {"test_skip_count", "swallowed_exception_count"} + blocking_smell_names = {"test_skip", "swallowed_exception"} edited_files = edited_files or set() related_via_imports: set[str] = set() @@ -611,36 +611,26 @@ def format_code_smell_feedback( blocking_smells: list[tuple[str, int, int, str, list[str]]] = [] other_smells: list[tuple[str, int, int, list[str], str]] = [] - # NOTE: getattr usage below is intentional - we iterate over model_fields dynamically - # to discover smell fields by their is_smell marker. Field names come from Pydantic's - # schema, not external input, so this is a justified use of dynamic attribute access. - for field_name, field_info in ExtendedComplexityMetrics.model_fields.items(): - extra = field_info.json_schema_extra - if not extra or not isinstance(extra, dict) or not extra.get("is_smell"): - continue + smell_changes = delta.get_smell_changes() if delta else {} - count = getattr(current_metrics, field_name, 0) - if count == 0: + for smell in current_metrics.get_smells(): + if smell.count == 0: continue - label = str(extra["label"]) - files_field = str(extra["files_field"]) - smell_files = list(getattr(current_metrics, files_field, [])) - change_field = field_name.replace("_count", "_change") - change = getattr(delta, change_field, 0) if delta else 0 - guidance = field_info.description or "" + change = smell_changes.get(smell.name, 0) + guidance = smell.definition.guidance - if field_name in blocking_fields and edited_files: - related_files = [f for f in smell_files if _is_file_related_to_edits(f, edited_files, related_via_imports)] - unrelated_files = [f for f in smell_files if f not in related_files] + if smell.name in blocking_smell_names and edited_files: + related_files = [f for f in smell.files if _is_file_related_to_edits(f, edited_files, related_via_imports)] + unrelated_files = [f for f in smell.files if f not in related_files] if related_files: - blocking_smells.append((label, len(related_files), change, guidance, related_files)) + blocking_smells.append((smell.label, len(related_files), change, guidance, related_files)) if unrelated_files: - other_smells.append((label, len(unrelated_files), 0, unrelated_files, guidance)) + other_smells.append((smell.label, len(unrelated_files), 0, unrelated_files, guidance)) else: - other_smells.append((label, count, change, smell_files, guidance)) + other_smells.append((smell.label, smell.count, change, list(smell.files), guidance)) lines: list[str] = [] has_blocking = len(blocking_smells) > 0 diff --git a/src/slopometry/core/models.py b/src/slopometry/core/models.py index 4623f7a..8d84ea2 100644 --- a/src/slopometry/core/models.py +++ b/src/slopometry/core/models.py @@ -6,7 +6,167 @@ from typing import Any from uuid import uuid4 -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field + + +class SmellCategory(str, Enum): + """Category of code smell for organization and filtering.""" + + GENERAL = "general" + PYTHON = "python" + RUST = "rust" + + +class SmellDefinition(BaseModel): + """Definition of a code smell with all metadata.""" + + model_config = ConfigDict(frozen=True) + + internal_name: str + label: str + category: SmellCategory + weight: float + guidance: str + count_field: str + files_field: str + + +SMELL_REGISTRY: dict[str, SmellDefinition] = { + # General smells (language-agnostic concepts) + "orphan_comment": SmellDefinition( + internal_name="orphan_comment", + label="Orphan Comments", + category=SmellCategory.GENERAL, + weight=0.02, + guidance="Make sure inline code comments add meaningful information about non-obvious design tradeoffs or explain tech debt or performance implications. Consider if these could be docstrings or field descriptors instead", + count_field="orphan_comment_count", + files_field="orphan_comment_files", + ), + "untracked_todo": SmellDefinition( + internal_name="untracked_todo", + label="Untracked TODOs", + category=SmellCategory.GENERAL, + weight=0.02, + guidance="Untracked TODOs should include ticket references (JIRA-123, #123) or URLs", + count_field="untracked_todo_count", + files_field="untracked_todo_files", + ), + "swallowed_exception": SmellDefinition( + internal_name="swallowed_exception", + label="Swallowed Exceptions", + category=SmellCategory.GENERAL, + weight=0.15, + guidance="BLOCKING: You MUST present a table with columns [Location | Purpose] for each and ask user to confirm silent failure is acceptable", + count_field="swallowed_exception_count", + files_field="swallowed_exception_files", + ), + "test_skip": SmellDefinition( + internal_name="test_skip", + label="Test Skips", + category=SmellCategory.GENERAL, + weight=0.10, + guidance="BLOCKING: You MUST present a table with columns [Test Name | Intent] for each skip and ask user to confirm each is still valid", + count_field="test_skip_count", + files_field="test_skip_files", + ), + "type_ignore": SmellDefinition( + internal_name="type_ignore", + label="Type Ignores", + category=SmellCategory.GENERAL, + weight=0.08, + guidance="Review type: ignore comments - consider fixing the underlying type issue", + count_field="type_ignore_count", + files_field="type_ignore_files", + ), + "dynamic_execution": SmellDefinition( + internal_name="dynamic_execution", + label="Dynamic Execution", + category=SmellCategory.GENERAL, + weight=0.12, + guidance="Review usage of eval/exec/compile - ensure this is necessary and secure", + count_field="dynamic_execution_count", + files_field="dynamic_execution_files", + ), + "inline_import": SmellDefinition( + internal_name="inline_import", + label="Inline Imports", + category=SmellCategory.PYTHON, + weight=0.02, + guidance="Verify if these can be moved to the top of the file (except TYPE_CHECKING guards)", + count_field="inline_import_count", + files_field="inline_import_files", + ), + "dict_get_with_default": SmellDefinition( + internal_name="dict_get_with_default", + label="Dict .get() Defaults", + category=SmellCategory.PYTHON, + weight=0.05, + guidance="Consider if these are justified or just indicate modeling gaps - replace with Pydantic BaseSettings or BaseModel with narrower field types or raise explicit errors instead", + count_field="dict_get_with_default_count", + files_field="dict_get_with_default_files", + ), + "hasattr_getattr": SmellDefinition( + internal_name="hasattr_getattr", + label="hasattr/getattr", + category=SmellCategory.PYTHON, + weight=0.10, + guidance="Consider if these indicate missing domain models - replace with Pydantic BaseModel objects with explicit fields or raise explicit errors instead", + count_field="hasattr_getattr_count", + files_field="hasattr_getattr_files", + ), + "nonempty_init": SmellDefinition( + internal_name="nonempty_init", + label="Non-empty __init__", + category=SmellCategory.PYTHON, + weight=0.03, + guidance="Consider if implementation code should be moved out of __init__.py files", + count_field="nonempty_init_count", + files_field="nonempty_init_files", + ), + # Abstraction smells (unnecessary complexity) + "single_method_class": SmellDefinition( + internal_name="single_method_class", + label="Single-Method Classes", + category=SmellCategory.PYTHON, + weight=0.05, + guidance="Consider using a function instead of a class with only one method besides __init__", + count_field="single_method_class_count", + files_field="single_method_class_files", + ), + "deep_inheritance": SmellDefinition( + internal_name="deep_inheritance", + label="Deep Inheritance", + category=SmellCategory.PYTHON, + weight=0.08, + guidance="Prefer composition over inheritance; >2 base classes increases complexity", + count_field="deep_inheritance_count", + files_field="deep_inheritance_files", + ), + "passthrough_wrapper": SmellDefinition( + internal_name="passthrough_wrapper", + label="Pass-Through Wrappers", + category=SmellCategory.PYTHON, + weight=0.02, + guidance="Function that just delegates to another with same args; consider removing indirection", + count_field="passthrough_wrapper_count", + files_field="passthrough_wrapper_files", + ), +} + + +def get_smell_label(internal_name: str) -> str: + """Get display label for a smell from registry.""" + defn = SMELL_REGISTRY.get(internal_name) + return defn.label if defn else internal_name.replace("_", " ").title() + + +def get_smells_by_category(category: SmellCategory) -> list[SmellDefinition]: + """Get all smells in a category, sorted by weight (highest first).""" + return sorted( + [d for d in SMELL_REGISTRY.values() if d.category == category], + key=lambda d: d.weight, + reverse=True, + ) def SmellField( @@ -39,7 +199,6 @@ class ProjectLanguage(str, Enum): """Supported languages for complexity analysis.""" PYTHON = "python" - # RUST = "rust" # Future: Add when rust analyzer ready class ProjectSource(str, Enum): @@ -111,6 +270,13 @@ class ToolType(str, Enum): OTHER = "Other" +class AnalysisSource(str, Enum): + """Source of the impact analysis.""" + + UNCOMMITTED_CHANGES = "uncommitted_changes" + PREVIOUS_COMMIT = "previous_commit" + + class GitState(BaseModel): """Represents git repository state at a point in time.""" @@ -165,6 +331,9 @@ class ComplexityMetrics(BaseModel): files_by_complexity: dict[str, int] = Field( default_factory=dict, description="Mapping of filename to complexity score" ) + files_by_effort: dict[str, float] = Field( + default_factory=dict, description="Mapping of filename to Halstead effort" + ) files_with_parse_errors: dict[str, str] = Field( default_factory=dict, description="Files that failed to parse: {filepath: error_message}" ) @@ -199,6 +368,8 @@ class ComplexityDelta(BaseModel): total_tokens_change: int = 0 avg_tokens_change: float = 0.0 + qpe_change: float = Field(default=0.0, description="QPE delta (current - baseline)") + type_hint_coverage_change: float = 0.0 docstring_coverage_change: float = 0.0 deprecation_change: int = 0 @@ -216,6 +387,27 @@ class ComplexityDelta(BaseModel): swallowed_exception_change: int = 0 type_ignore_change: int = 0 dynamic_execution_change: int = 0 + single_method_class_change: int = 0 + deep_inheritance_change: int = 0 + passthrough_wrapper_change: int = 0 + + def get_smell_changes(self) -> dict[str, int]: + """Return smell name to change value mapping for direct access.""" + return { + "orphan_comment": self.orphan_comment_change, + "untracked_todo": self.untracked_todo_change, + "inline_import": self.inline_import_change, + "dict_get_with_default": self.dict_get_with_default_change, + "hasattr_getattr": self.hasattr_getattr_change, + "nonempty_init": self.nonempty_init_change, + "test_skip": self.test_skip_change, + "swallowed_exception": self.swallowed_exception_change, + "type_ignore": self.type_ignore_change, + "dynamic_execution": self.dynamic_execution_change, + "single_method_class": self.single_method_class_change, + "deep_inheritance": self.deep_inheritance_change, + "passthrough_wrapper": self.passthrough_wrapper_change, + } class TodoItem(BaseModel): @@ -482,6 +674,36 @@ def get_high_priority_stories(self) -> list[UserStory]: return [story for story in self.user_stories if story.priority <= 2] +class SmellData(BaseModel): + """Structured smell data with direct field access (no getattr needed).""" + + model_config = ConfigDict(frozen=True) + + name: str + count: int + files: list[str] + + @property + def definition(self) -> SmellDefinition: + """Get the smell definition from registry.""" + return SMELL_REGISTRY[self.name] + + @property + def label(self) -> str: + """Get display label from registry.""" + return self.definition.label + + @property + def category(self) -> SmellCategory: + """Get category from registry.""" + return self.definition.category + + @property + def weight(self) -> float: + """Get weight from registry.""" + return self.definition.weight + + class ExtendedComplexityMetrics(ComplexityMetrics): """Extended metrics including Halstead and Maintainability Index. @@ -569,6 +791,28 @@ class ExtendedComplexityMetrics(ComplexityMetrics): files_field="dynamic_execution_files", guidance="Review usage of eval/exec/compile - ensure this is necessary and secure", ) + single_method_class_count: int = SmellField( + label="Single-Method Classes", + files_field="single_method_class_files", + guidance="Consider using a function instead of a class with only one method besides __init__", + ) + deep_inheritance_count: int = SmellField( + label="Deep Inheritance", + files_field="deep_inheritance_files", + guidance="Prefer composition over inheritance; >2 base classes increases complexity", + ) + passthrough_wrapper_count: int = SmellField( + label="Pass-Through Wrappers", + files_field="passthrough_wrapper_files", + guidance="Function that just delegates to another with same args; consider removing indirection", + ) + + # LOC metrics (for file filtering in QPE) + total_loc: int = Field(default=0, description="Total lines of code across all files") + code_loc: int = Field(default=0, description="Non-blank, non-comment lines") + files_by_loc: dict[str, int] = Field( + default_factory=dict, description="Mapping of filepath to code LOC for file filtering" + ) orphan_comment_files: list[str] = Field(default_factory=list, description="Files with orphan comments") untracked_todo_files: list[str] = Field(default_factory=list, description="Files with untracked TODOs") @@ -580,6 +824,89 @@ class ExtendedComplexityMetrics(ComplexityMetrics): swallowed_exception_files: list[str] = Field(default_factory=list, description="Files with swallowed exceptions") type_ignore_files: list[str] = Field(default_factory=list, description="Files with type: ignore") dynamic_execution_files: list[str] = Field(default_factory=list, description="Files with eval/exec/compile") + single_method_class_files: list[str] = Field(default_factory=list, description="Files with single-method classes") + deep_inheritance_files: list[str] = Field( + default_factory=list, description="Files with deep inheritance (>2 bases)" + ) + passthrough_wrapper_files: list[str] = Field(default_factory=list, description="Files with pass-through wrappers") + + def get_smells(self) -> list["SmellData"]: + """Return all smell data as structured objects with direct field access.""" + return [ + SmellData( + name="orphan_comment", + count=self.orphan_comment_count, + files=self.orphan_comment_files, + ), + SmellData( + name="untracked_todo", + count=self.untracked_todo_count, + files=self.untracked_todo_files, + ), + SmellData( + name="swallowed_exception", + count=self.swallowed_exception_count, + files=self.swallowed_exception_files, + ), + SmellData( + name="test_skip", + count=self.test_skip_count, + files=self.test_skip_files, + ), + SmellData( + name="type_ignore", + count=self.type_ignore_count, + files=self.type_ignore_files, + ), + SmellData( + name="dynamic_execution", + count=self.dynamic_execution_count, + files=self.dynamic_execution_files, + ), + SmellData( + name="inline_import", + count=self.inline_import_count, + files=self.inline_import_files, + ), + SmellData( + name="dict_get_with_default", + count=self.dict_get_with_default_count, + files=self.dict_get_with_default_files, + ), + SmellData( + name="hasattr_getattr", + count=self.hasattr_getattr_count, + files=self.hasattr_getattr_files, + ), + SmellData( + name="nonempty_init", + count=self.nonempty_init_count, + files=self.nonempty_init_files, + ), + SmellData( + name="single_method_class", + count=self.single_method_class_count, + files=self.single_method_class_files, + ), + SmellData( + name="deep_inheritance", + count=self.deep_inheritance_count, + files=self.deep_inheritance_files, + ), + SmellData( + name="passthrough_wrapper", + count=self.passthrough_wrapper_count, + files=self.passthrough_wrapper_files, + ), + ] + + def get_smell_files(self) -> dict[str, list[str]]: + """Return smell name to files mapping for filtering.""" + return {smell.name: smell.files for smell in self.get_smells()} + + def get_smell_counts(self) -> dict[str, int]: + """Return smell name to count mapping for display.""" + return {smell.name: smell.count for smell in self.get_smells()} class ExperimentRun(BaseModel): @@ -869,7 +1196,6 @@ class RepoBaseline(BaseModel): current_metrics: ExtendedComplexityMetrics = Field(description="Metrics at HEAD commit") - # Date range and token metrics for Galen Rate calculation oldest_commit_date: datetime | None = Field( default=None, description="Timestamp of the oldest commit in the analysis" ) @@ -880,6 +1206,9 @@ class RepoBaseline(BaseModel): default=None, description="Total tokens in codebase at oldest analyzed commit" ) + qpe_stats: HistoricalMetricStats | None = Field(default=None, description="QPE statistics from commit history") + current_qpe: "QPEScore | None" = Field(default=None, description="QPE score at HEAD") + class ZScoreInterpretation(str, Enum): """Human-readable interpretation of Z-score values.""" @@ -941,6 +1270,9 @@ class ImpactAssessment(BaseModel): effort_delta: float = Field(description="Total Effort change (sum across all files)") mi_delta: float = Field(description="Total MI change (sum across all files)") + qpe_delta: float = Field(default=0.0, description="QPE change between baseline and current") + qpe_z_score: float = Field(default=0.0, description="Z-score for QPE change (positive = above avg improvement)") + def interpret_cc(self, verbose: bool = False) -> ZScoreInterpretation: """Interpret CC z-score (lower CC is better, so invert).""" return ZScoreInterpretation.from_z_score(-self.cc_z_score, verbose) @@ -953,24 +1285,28 @@ def interpret_mi(self, verbose: bool = False) -> ZScoreInterpretation: """Interpret MI z-score (higher MI is better, no inversion).""" return ZScoreInterpretation.from_z_score(self.mi_z_score, verbose) + def interpret_qpe(self, verbose: bool = False) -> ZScoreInterpretation: + """Interpret QPE z-score (higher QPE is better, no inversion).""" + return ZScoreInterpretation.from_z_score(self.qpe_z_score, verbose) + class QPEScore(BaseModel): """Quality-Per-Effort score for principled code quality comparison. - QPE provides a single metric that: - - Uses MI as the sole quality signal (no double-counting with CC/Volume) - - Normalizes by Halstead Effort for fair cross-project comparison - - Includes code smell penalties with explicit weights - - Produces bounded output suitable for GRPO advantage calculation + Provides two metrics for different use cases: + - qpe: Effort-normalized score for GRPO rollout comparison (same spec) + - qpe_absolute: Raw quality without effort normalization (cross-project/temporal) + + Uses MI as sole quality signal with sigmoid-saturated smell penalties. """ - qpe: float = Field(description="Quality-per-effort score (higher is better)") + qpe: float = Field(description="Quality-per-effort score for GRPO (higher is better)") + qpe_absolute: float = Field(description="Quality without effort normalization (for cross-project/temporal)") mi_normalized: float = Field(description="Maintainability Index normalized to 0-1") - smell_penalty: float = Field(description="Penalty from code smells (0-0.5 range)") + smell_penalty: float = Field(description="Penalty from code smells (sigmoid-saturated, 0-0.9 range)") adjusted_quality: float = Field(description="MI after smell penalty applied") effort_factor: float = Field(description="log(total_halstead_effort + 1)") - # Component breakdown for debugging smell_counts: dict[str, int] = Field( default_factory=dict, description="Individual smell counts contributing to penalty" ) @@ -994,7 +1330,6 @@ class CrossProjectComparison(BaseModel): compared_at: datetime = Field(default_factory=datetime.now) total_projects: int = Field(description="Total number of projects compared") - # Flat rankings sorted by QPE (highest first) rankings: list[ProjectQPEResult] = Field(default_factory=list, description="Projects ranked by QPE, highest first") @@ -1037,12 +1372,25 @@ class StagedChangesAnalysis(BaseModel): class CurrentChangesAnalysis(BaseModel): - """Complete analysis of uncommitted changes against repository baseline.""" + """Complete analysis of changes against repository baseline.""" repository_path: str analysis_timestamp: datetime = Field(default_factory=datetime.now) - changed_files: list[str] = Field(description="List of changed Python files (staged and unstaged)") + source: AnalysisSource = Field( + default=AnalysisSource.UNCOMMITTED_CHANGES, + description="Whether analyzing uncommitted changes or previous commit", + ) + analyzed_commit_sha: str | None = Field( + default=None, + description="SHA of the commit being analyzed (when source is PREVIOUS_COMMIT)", + ) + base_commit_sha: str | None = Field( + default=None, + description="SHA of the base commit for comparison (when source is PREVIOUS_COMMIT)", + ) + + changed_files: list[str] = Field(description="List of changed Python files") current_metrics: ExtendedComplexityMetrics = Field(description="Metrics with uncommitted changes applied") baseline_metrics: ExtendedComplexityMetrics = Field(description="Metrics at current HEAD") @@ -1054,7 +1402,6 @@ class CurrentChangesAnalysis(BaseModel): ) filtered_coverage: dict[str, float] | None = Field(default=None, description="Coverage % for changed files") - # Token impact metrics blind_spot_tokens: int = Field(default=0, description="Total tokens in blind spot files") changed_files_tokens: int = Field(default=0, description="Total tokens in changed files") complete_picture_context_size: int = Field(default=0, description="Sum of tokens in changed files + blind spots") diff --git a/src/slopometry/core/project_guard.py b/src/slopometry/core/project_guard.py new file mode 100644 index 0000000..50d2edc --- /dev/null +++ b/src/slopometry/core/project_guard.py @@ -0,0 +1,107 @@ +"""Guard against running analysis in directories with multiple projects.""" + +import logging +import subprocess +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class MultiProjectError(Exception): + """Raised when analysis is attempted in a directory with multiple projects.""" + + def __init__(self, project_count: int, projects: list[str]): + self.project_count = project_count + self.projects = projects + super().__init__( + f"Directory contains {project_count} git repositories. " + f"Slopometry analyzes single projects. Run from within a specific project directory.\n" + f"Found: {', '.join(projects[:5])}{'...' if len(projects) > 5 else ''}" + ) + + +def _is_git_submodule(path: Path, root: Path) -> bool: + """Check if a .git path is a submodule (file pointing to parent's .git/modules).""" + git_path = path / ".git" + if not git_path.exists(): + return False + if git_path.is_file(): + return True + if path == root: + return False + try: + result = subprocess.run( + ["git", "-C", str(root), "config", "--file", ".gitmodules", "--get-regexp", "path"], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0: + rel_path = str(path.relative_to(root)) + for line in result.stdout.strip().split("\n"): + if line and rel_path in line: + return True + except Exception as e: + logger.debug(f"Failed to check .gitmodules for submodule detection: {e}") + return False + + +def detect_multi_project_directory( + root: Path, + max_depth: int = 2, + max_projects: int = 1, +) -> list[str]: + """Detect if directory contains multiple git repositories. + + Args: + root: Directory to check + max_depth: Maximum directory depth to search (default 2) + max_projects: Maximum allowed projects before raising error (default 1) + + Returns: + List of project paths found (relative to root) + """ + root = root.resolve() + projects: list[str] = [] + + def scan_dir(path: Path, depth: int) -> None: + if depth > max_depth: + return + + git_dir = path / ".git" + if git_dir.exists(): + if not _is_git_submodule(path, root): + rel_path = str(path.relative_to(root)) if path != root else "." + projects.append(rel_path) + return + + try: + for child in path.iterdir(): + if child.is_dir() and not child.name.startswith("."): + scan_dir(child, depth + 1) + except PermissionError as e: + logger.debug(f"Permission denied scanning directory {path}: {e}") + + scan_dir(root, 0) + return projects + + +def guard_single_project(root: Path, max_depth: int = 2) -> None: + """Raise MultiProjectError if directory contains multiple projects. + + Args: + root: Directory to check + max_depth: Maximum directory depth to search + + Raises: + MultiProjectError: If multiple git repositories found + """ + projects = detect_multi_project_directory(root, max_depth=max_depth, max_projects=1) + + if len(projects) > 1: + raise MultiProjectError(len(projects), projects) + + if len(projects) == 0: + git_dir = root / ".git" + if not git_dir.exists(): + raise MultiProjectError(0, []) diff --git a/src/slopometry/core/python_feature_analyzer.py b/src/slopometry/core/python_feature_analyzer.py index ffa8496..97a1bed 100644 --- a/src/slopometry/core/python_feature_analyzer.py +++ b/src/slopometry/core/python_feature_analyzer.py @@ -23,7 +23,6 @@ class FeatureStats(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - # Basic counts (no actionable message needed) functions_count: int = 0 classes_count: int = 0 docstrings_count: int = 0 @@ -36,7 +35,6 @@ class FeatureStats(BaseModel): str_type_count: int = 0 deprecations_count: int = 0 - # Code smells - self-describing via SmellField metadata orphan_comment_count: int = SmellField( label="Orphan Comments", files_field="orphan_comment_files", @@ -87,8 +85,25 @@ class FeatureStats(BaseModel): files_field="dynamic_execution_files", guidance="Review usage of eval/exec/compile - ensure this is necessary and secure", ) + single_method_class_count: int = SmellField( + label="Single-Method Classes", + files_field="single_method_class_files", + guidance="Consider using a function instead of a class with only one method besides __init__", + ) + deep_inheritance_count: int = SmellField( + label="Deep Inheritance", + files_field="deep_inheritance_files", + guidance="Prefer composition over inheritance; >2 base classes increases complexity", + ) + passthrough_wrapper_count: int = SmellField( + label="Pass-Through Wrappers", + files_field="passthrough_wrapper_files", + guidance="Function that just delegates to another with same args; consider removing indirection", + ) + + total_loc: int = Field(default=0, description="Total lines of code") + code_loc: int = Field(default=0, description="Non-blank, non-comment lines (for QPE file filtering)") - # File tracking orphan_comment_files: set[str] = Field(default_factory=set) untracked_todo_files: set[str] = Field(default_factory=set) inline_import_files: set[str] = Field(default_factory=set) @@ -99,6 +114,21 @@ class FeatureStats(BaseModel): swallowed_exception_files: set[str] = Field(default_factory=set) type_ignore_files: set[str] = Field(default_factory=set) dynamic_execution_files: set[str] = Field(default_factory=set) + single_method_class_files: set[str] = Field(default_factory=set) + deep_inheritance_files: set[str] = Field(default_factory=set) + passthrough_wrapper_files: set[str] = Field(default_factory=set) + + +def _count_loc(content: str) -> tuple[int, int]: + """Count total lines and code lines (non-blank, non-comment). + + Returns: + Tuple of (total_loc, code_loc) + """ + lines = content.splitlines() + total = len(lines) + code = sum(1 for line in lines if line.strip() and not line.strip().startswith("#")) + return total, code def _analyze_single_file_features(file_path: Path) -> FeatureStats | None: @@ -120,6 +150,7 @@ def _analyze_single_file_features(file_path: Path) -> FeatureStats | None: is_test_file = file_path.name.startswith("test_") or "/tests/" in str(file_path) orphan_comments, untracked_todos, type_ignores = _analyze_comments_standalone(content, is_test_file) nonempty_init = 1 if _is_nonempty_init_standalone(file_path, tree) else 0 + total_loc, code_loc = _count_loc(content) path_str = str(file_path) return FeatureStats( @@ -144,6 +175,11 @@ def _analyze_single_file_features(file_path: Path) -> FeatureStats | None: swallowed_exception_count=ast_stats.swallowed_exception_count, type_ignore_count=type_ignores, dynamic_execution_count=ast_stats.dynamic_execution_count, + single_method_class_count=ast_stats.single_method_class_count, + deep_inheritance_count=ast_stats.deep_inheritance_count, + passthrough_wrapper_count=ast_stats.passthrough_wrapper_count, + total_loc=total_loc, + code_loc=code_loc, orphan_comment_files={path_str} if orphan_comments > 0 else set(), untracked_todo_files={path_str} if untracked_todos > 0 else set(), inline_import_files={path_str} if ast_stats.inline_import_count > 0 else set(), @@ -154,6 +190,9 @@ def _analyze_single_file_features(file_path: Path) -> FeatureStats | None: swallowed_exception_files={path_str} if ast_stats.swallowed_exception_count > 0 else set(), type_ignore_files={path_str} if type_ignores > 0 else set(), dynamic_execution_files={path_str} if ast_stats.dynamic_execution_count > 0 else set(), + single_method_class_files={path_str} if ast_stats.single_method_class_count > 0 else set(), + deep_inheritance_files={path_str} if ast_stats.deep_inheritance_count > 0 else set(), + passthrough_wrapper_files={path_str} if ast_stats.passthrough_wrapper_count > 0 else set(), ) @@ -247,13 +286,11 @@ def analyze_directory(self, directory: Path) -> FeatureStats: start_total = time.perf_counter() - # Use parallel processing for large file sets if len(python_files) >= settings.parallel_file_threshold: results = self._analyze_files_parallel(python_files) else: results = [_analyze_single_file_features(fp) for fp in python_files] - # Aggregate results aggregated = FeatureStats() for stats in results: if stats is not None: @@ -310,6 +347,7 @@ def _analyze_file(self, file_path: Path) -> FeatureStats: is_test_file = file_path.name.startswith("test_") or "/tests/" in str(file_path) orphan_comments, untracked_todos, type_ignores = self._analyze_comments(content, is_test_file) nonempty_init = 1 if self._is_nonempty_init(file_path, tree) else 0 + total_loc, code_loc = _count_loc(content) path_str = str(file_path) return FeatureStats( @@ -334,6 +372,11 @@ def _analyze_file(self, file_path: Path) -> FeatureStats: swallowed_exception_count=ast_stats.swallowed_exception_count, type_ignore_count=type_ignores, dynamic_execution_count=ast_stats.dynamic_execution_count, + single_method_class_count=ast_stats.single_method_class_count, + deep_inheritance_count=ast_stats.deep_inheritance_count, + passthrough_wrapper_count=ast_stats.passthrough_wrapper_count, + total_loc=total_loc, + code_loc=code_loc, orphan_comment_files={path_str} if orphan_comments > 0 else set(), untracked_todo_files={path_str} if untracked_todos > 0 else set(), inline_import_files={path_str} if ast_stats.inline_import_count > 0 else set(), @@ -344,6 +387,9 @@ def _analyze_file(self, file_path: Path) -> FeatureStats: swallowed_exception_files={path_str} if ast_stats.swallowed_exception_count > 0 else set(), type_ignore_files={path_str} if type_ignores > 0 else set(), dynamic_execution_files={path_str} if ast_stats.dynamic_execution_count > 0 else set(), + single_method_class_files={path_str} if ast_stats.single_method_class_count > 0 else set(), + deep_inheritance_files={path_str} if ast_stats.deep_inheritance_count > 0 else set(), + passthrough_wrapper_files={path_str} if ast_stats.passthrough_wrapper_count > 0 else set(), ) def _is_nonempty_init(self, file_path: Path, tree: ast.Module) -> bool: @@ -454,6 +500,11 @@ def _merge_stats(self, s1: FeatureStats, s2: FeatureStats) -> FeatureStats: swallowed_exception_count=s1.swallowed_exception_count + s2.swallowed_exception_count, type_ignore_count=s1.type_ignore_count + s2.type_ignore_count, dynamic_execution_count=s1.dynamic_execution_count + s2.dynamic_execution_count, + single_method_class_count=s1.single_method_class_count + s2.single_method_class_count, + deep_inheritance_count=s1.deep_inheritance_count + s2.deep_inheritance_count, + passthrough_wrapper_count=s1.passthrough_wrapper_count + s2.passthrough_wrapper_count, + total_loc=s1.total_loc + s2.total_loc, + code_loc=s1.code_loc + s2.code_loc, orphan_comment_files=s1.orphan_comment_files | s2.orphan_comment_files, untracked_todo_files=s1.untracked_todo_files | s2.untracked_todo_files, inline_import_files=s1.inline_import_files | s2.inline_import_files, @@ -464,6 +515,9 @@ def _merge_stats(self, s1: FeatureStats, s2: FeatureStats) -> FeatureStats: swallowed_exception_files=s1.swallowed_exception_files | s2.swallowed_exception_files, type_ignore_files=s1.type_ignore_files | s2.type_ignore_files, dynamic_execution_files=s1.dynamic_execution_files | s2.dynamic_execution_files, + single_method_class_files=s1.single_method_class_files | s2.single_method_class_files, + deep_inheritance_files=s1.deep_inheritance_files | s2.deep_inheritance_files, + passthrough_wrapper_files=s1.passthrough_wrapper_files | s2.passthrough_wrapper_files, ) @@ -490,6 +544,10 @@ def __init__(self): self.test_skips = 0 self.swallowed_exceptions = 0 self.dynamic_executions = 0 + # Abstraction smells + self.single_method_classes = 0 + self.deep_inheritances = 0 + self.passthrough_wrappers = 0 @property def stats(self) -> FeatureStats: @@ -511,6 +569,9 @@ def stats(self) -> FeatureStats: test_skip_count=self.test_skips, swallowed_exception_count=self.swallowed_exceptions, dynamic_execution_count=self.dynamic_executions, + single_method_class_count=self.single_method_classes, + deep_inheritance_count=self.deep_inheritances, + passthrough_wrapper_count=self.passthrough_wrappers, ) def _collect_type_names(self, node: ast.AST | None) -> None: @@ -588,6 +649,10 @@ def visit_FunctionDef(self, node: ast.FunctionDef) -> None: if self._is_test_skip_decorator(decorator): self.test_skips += 1 + # Pass-through wrapper detection + if self._is_passthrough(node): + self.passthrough_wrappers += 1 + self._scope_depth += 1 self.generic_visit(node) self._scope_depth -= 1 @@ -599,6 +664,11 @@ def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: self.deprecations += 1 if self._is_test_skip_decorator(decorator): self.test_skips += 1 + + # Pass-through wrapper detection + if self._is_passthrough(node): + self.passthrough_wrappers += 1 + self._scope_depth += 1 self.generic_visit(node) self._scope_depth -= 1 @@ -633,6 +703,17 @@ def visit_ClassDef(self, node: ast.ClassDef) -> None: if self._is_deprecated_decorator(decorator): self.deprecations += 1 + # Deep inheritance: >2 base classes + if len(node.bases) > 2: + self.deep_inheritances += 1 + + # Single-method class: only one method besides __init__ + methods = [ + n for n in node.body if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef) and n.name != "__init__" + ] + if len(methods) == 1: + self.single_method_classes += 1 + self._scope_depth += 1 self.generic_visit(node) self._scope_depth -= 1 @@ -728,6 +809,36 @@ def _is_dynamic_execution_call(self, node: ast.Call) -> bool: return func.id in ("eval", "exec", "compile") return False + def _is_passthrough(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> bool: + """Check if function just returns a call with same arguments. + + A pass-through wrapper is a function whose body is just: + - return other_func(arg1, arg2, ...) + where arg1, arg2, ... are exactly the function's parameters (excluding self/cls). + """ + # Skip if function has docstring (body[0] is docstring, body[1] might be return) + body = node.body + if ast.get_docstring(node): + body = body[1:] + + if len(body) != 1: + return False + + stmt = body[0] + if not isinstance(stmt, ast.Return) or not isinstance(stmt.value, ast.Call): + return False + + call = stmt.value + + func_args = [arg.arg for arg in node.args.args if arg.arg not in ("self", "cls")] + + if not func_args: + return False + + call_args = [arg.id for arg in call.args if isinstance(arg, ast.Name)] + + return func_args == call_args + def visit_If(self, node: ast.If) -> None: """Track TYPE_CHECKING blocks to exclude their imports.""" if self._is_type_checking_guard(node): diff --git a/src/slopometry/core/settings.py b/src/slopometry/core/settings.py index aeffe3a..501f1dd 100644 --- a/src/slopometry/core/settings.py +++ b/src/slopometry/core/settings.py @@ -130,6 +130,34 @@ def _ensure_global_config_dir() -> None: baseline_max_commits: int = Field(default=100, description="Maximum commits to analyze for baseline computation") + qpe_sigmoid_steepness: float = Field( + default=2.0, + description="Steepness factor for QPE smell penalty sigmoid (higher = faster saturation)", + ) + + # QPE bonus thresholds (coverage % needed to earn bonus) + qpe_test_coverage_threshold: float = Field(default=80.0, description="Test coverage % threshold to earn QPE bonus") + qpe_type_coverage_threshold: float = Field( + default=90.0, description="Type hint coverage % threshold to earn QPE bonus" + ) + qpe_docstring_coverage_threshold: float = Field( + default=80.0, description="Docstring coverage % threshold to earn QPE bonus" + ) + + # QPE bonus amounts (added to adjusted_quality when threshold met) + qpe_test_coverage_bonus: float = Field(default=0.05, description="QPE bonus for meeting test coverage threshold") + qpe_type_coverage_bonus: float = Field( + default=0.05, description="QPE bonus for meeting type hint coverage threshold" + ) + qpe_docstring_coverage_bonus: float = Field( + default=0.02, description="QPE bonus for meeting docstring coverage threshold" + ) + + # QPE file filtering (anti-gaming) + qpe_min_loc_per_file: int = Field( + default=10, description="Minimum code LOC to count a file for smell normalization" + ) + impact_cc_weight: float = Field(default=0.25, description="Weight for CC in impact score calculation") impact_effort_weight: float = Field( default=0.25, description="Weight for Halstead Effort in impact score calculation" @@ -159,12 +187,16 @@ def warn_unknown_prefixed_settings(self) -> "Settings": if key.upper().startswith(prefix): prefixed_env_vars.add(key.upper()) - for env_file in self.model_config.get("env_file", []): - env_path = Path(env_file) - if env_path.exists(): - for key in dotenv_values(env_path): - if key.upper().startswith(prefix): - prefixed_env_vars.add(key.upper()) + env_files = self.model_config.get("env_file", []) + if env_files is not None: + # Normalize to list - env_file can be a single path or list of paths + env_file_list = [env_files] if isinstance(env_files, (str, Path)) else list(env_files) + for env_file in env_file_list: + env_path = Path(env_file) + if env_path.exists(): + for key in dotenv_values(env_path): + if key.upper().startswith(prefix): + prefixed_env_vars.add(key.upper()) unknown = prefixed_env_vars - known_fields if unknown: diff --git a/src/slopometry/display/console.py b/src/slopometry/display/console.py new file mode 100644 index 0000000..1ae7756 --- /dev/null +++ b/src/slopometry/display/console.py @@ -0,0 +1,7 @@ +"""Singleton Rich Console instance for consistent output across the application.""" + +from rich.console import Console + +# Single console instance used throughout the application +# This ensures pager context works correctly across modules +console = Console() diff --git a/src/slopometry/display/formatters.py b/src/slopometry/display/formatters.py index 2c52ac4..795a188 100644 --- a/src/slopometry/display/formatters.py +++ b/src/slopometry/display/formatters.py @@ -4,10 +4,17 @@ from datetime import datetime from pathlib import Path -from rich.console import Console from rich.table import Table -from slopometry.core.models import CompactEvent, TokenUsage, ZScoreInterpretation +from slopometry.core.models import ( + SMELL_REGISTRY, + CompactEvent, + SmellCategory, + TokenUsage, + ZScoreInterpretation, + get_smell_label, + get_smells_by_category, +) from slopometry.core.settings import settings logger = logging.getLogger(__name__) @@ -60,6 +67,7 @@ def truncate_path(path: str, max_width: int = 55) -> str: from slopometry.core.models import ( + AnalysisSource, ContextCoverage, CrossProjectComparison, CurrentChangesAnalysis, @@ -73,8 +81,7 @@ def truncate_path(path: str, max_width: int = 55) -> str: SessionStatistics, StagedChangesAnalysis, ) - -console = Console() +from slopometry.display.console import console def _calculate_galen_metrics_from_baseline( @@ -257,7 +264,7 @@ def _display_compact_events(compact_events: list[CompactEvent]) -> None: console.print(f"\n[bold]Compacts ({len(compact_events)})[/bold]") table = Table() - table.add_column("Time", style="cyan") + table.add_column("Date/Time", style="cyan") table.add_column("Trigger", style="yellow") table.add_column("Pre-Tokens", justify="right") table.add_column("Version", style="dim") @@ -266,7 +273,7 @@ def _display_compact_events(compact_events: list[CompactEvent]) -> None: for compact in compact_events: table.add_row( - compact.timestamp.strftime("%H:%M:%S"), + compact.timestamp.strftime("%m-%d %H:%M"), compact.trigger, _format_token_count(compact.pre_tokens), compact.version, @@ -387,40 +394,15 @@ def _display_complexity_metrics( ) overview_table.add_row("[bold]Code Smells[/bold]", "") + smell_counts = metrics.get_smell_counts() - smell_color = "red" if metrics.orphan_comment_count > 0 else "green" - overview_table.add_row(" Orphan Comments", f"[{smell_color}]{metrics.orphan_comment_count}[/{smell_color}]") - - smell_color = "red" if metrics.untracked_todo_count > 0 else "green" - overview_table.add_row(" Untracked TODOs", f"[{smell_color}]{metrics.untracked_todo_count}[/{smell_color}]") - - smell_color = "red" if metrics.inline_import_count > 0 else "green" - overview_table.add_row(" Inline Imports", f"[{smell_color}]{metrics.inline_import_count}[/{smell_color}]") - - smell_color = "red" if metrics.dict_get_with_default_count > 0 else "green" - overview_table.add_row( - " .get() w/ Defaults", f"[{smell_color}]{metrics.dict_get_with_default_count}[/{smell_color}]" - ) - - smell_color = "red" if metrics.hasattr_getattr_count > 0 else "green" - overview_table.add_row(" hasattr/getattr", f"[{smell_color}]{metrics.hasattr_getattr_count}[/{smell_color}]") - - smell_color = "red" if metrics.nonempty_init_count > 0 else "green" - overview_table.add_row(" Non-empty __init__", f"[{smell_color}]{metrics.nonempty_init_count}[/{smell_color}]") - - smell_color = "red" if metrics.test_skip_count > 0 else "green" - overview_table.add_row(" Test Skips", f"[{smell_color}]{metrics.test_skip_count}[/{smell_color}]") - - smell_color = "red" if metrics.swallowed_exception_count > 0 else "green" - overview_table.add_row( - " Swallowed Exceptions", f"[{smell_color}]{metrics.swallowed_exception_count}[/{smell_color}]" - ) - - smell_color = "red" if metrics.type_ignore_count > 0 else "green" - overview_table.add_row(" Type Ignores", f"[{smell_color}]{metrics.type_ignore_count}[/{smell_color}]") - - smell_color = "red" if metrics.dynamic_execution_count > 0 else "green" - overview_table.add_row(" Dynamic Execution", f"[{smell_color}]{metrics.dynamic_execution_count}[/{smell_color}]") + for category in [SmellCategory.GENERAL, SmellCategory.PYTHON]: + category_label = "General" if category == SmellCategory.GENERAL else "Python" + overview_table.add_row(f" [dim]{category_label}[/dim]", "") + for defn in get_smells_by_category(category): + count = smell_counts[defn.internal_name] + smell_color = "red" if count > 0 else "green" + overview_table.add_row(f" {defn.label}", f"[{smell_color}]{count}[/{smell_color}]") console.print(overview_table) @@ -430,32 +412,20 @@ def _display_complexity_metrics( if show_smell_files: _display_code_smells_detailed(metrics) else: - has_smell_files = any( - [ - metrics.orphan_comment_files, - metrics.untracked_todo_files, - metrics.inline_import_files, - metrics.dict_get_with_default_files, - metrics.hasattr_getattr_files, - metrics.nonempty_init_files, - metrics.test_skip_files, - metrics.swallowed_exception_files, - metrics.type_ignore_files, - metrics.dynamic_execution_files, - ] - ) + smell_files = metrics.get_smell_files() + has_smell_files = any(smell_files.values()) if has_smell_files: console.print("\n[dim]Run with --smell-details to see affected files[/dim]") - if metrics.files_by_complexity: - files_table = Table(title="Files by Complexity") + if metrics.files_by_effort: + files_table = Table(title="Files by Halstead Effort") files_table.add_column("File", style="cyan", no_wrap=True, max_width=55) - files_table.add_column("Complexity", justify="right", width=12) + files_table.add_column("Effort", justify="right", width=12) - sorted_files = sorted(metrics.files_by_complexity.items(), key=lambda x: x[1], reverse=True)[:10] + sorted_files = sorted(metrics.files_by_effort.items(), key=lambda x: x[1], reverse=True)[:10] - for file_path, complexity in sorted_files: - files_table.add_row(truncate_path(file_path, max_width=55), str(complexity)) + for file_path, effort in sorted_files: + files_table.add_row(truncate_path(file_path, max_width=55), f"{effort:,.0f}") console.print(files_table) @@ -1136,21 +1106,28 @@ def display_current_impact_analysis( compact_events: list[CompactEvent] | None = None, show_file_details: bool = False, ) -> None: - """Display uncommitted changes impact analysis with Rich formatting. + """Display changes impact analysis with Rich formatting. Args: analysis: The current changes analysis to display compact_events: Optional list of compact events from session transcript show_file_details: Whether to show detailed file lists (blind spots) """ - console.print("\n[bold]Uncommitted Changes Impact Analysis[/bold]") + if analysis.source == AnalysisSource.PREVIOUS_COMMIT: + title = f"Previous Commit Impact Analysis ({analysis.base_commit_sha}..{analysis.analyzed_commit_sha})" + impact_title = "Previous Commit Impact" + else: + title = "Uncommitted Changes Impact Analysis" + impact_title = "Uncommitted Changes Impact" + + console.print(f"\n[bold]{title}[/bold]") console.print(f"Repository: {analysis.repository_path}") display_baseline_comparison( baseline=analysis.baseline, assessment=analysis.assessment, - title="Uncommitted Changes Impact", + title=impact_title, ) console.print("\n[bold]Token Impact:[/bold]") @@ -1251,33 +1228,17 @@ def get_filtered_files(files: list[str]) -> list[str]: return files return [f for f in files if f in filter_files] - has_smells = False - - orphan_files = get_filtered_files(metrics.orphan_comment_files) - todo_files = get_filtered_files(metrics.untracked_todo_files) - import_files = get_filtered_files(metrics.inline_import_files) - get_files = get_filtered_files(metrics.dict_get_with_default_files) - getattr_files = get_filtered_files(metrics.hasattr_getattr_files) - init_files = get_filtered_files(metrics.nonempty_init_files) - test_skip_files = get_filtered_files(metrics.test_skip_files) - swallowed_exception_files = get_filtered_files(metrics.swallowed_exception_files) - type_ignore_files = get_filtered_files(metrics.type_ignore_files) - dynamic_execution_files = get_filtered_files(metrics.dynamic_execution_files) - - if ( - orphan_files - or todo_files - or import_files - or get_files - or getattr_files - or init_files - or test_skip_files - or swallowed_exception_files - or type_ignore_files - or dynamic_execution_files - ): - has_smells = True + smell_files = metrics.get_smell_files() + smell_data_by_category: dict[SmellCategory, list[tuple[str, list[str]]]] = { + SmellCategory.GENERAL: [], + SmellCategory.PYTHON: [], + } + for defn in SMELL_REGISTRY.values(): + files = get_filtered_files(smell_files[defn.internal_name]) + if files: + smell_data_by_category[defn.category].append((defn.label, files)) + has_smells = any(smell_data_by_category[cat] for cat in smell_data_by_category) if not has_smells: return @@ -1290,28 +1251,16 @@ def get_filtered_files(files: list[str]) -> list[str]: table.add_column("Count", justify="right", width=8) table.add_column("Affected Files", style="dim", max_width=55) - def add_smell_row(label: str, files: list[str]) -> None: - count = len(files) - if count == 0: - return - - color = "red" - count_str = f"[{color}]{count}[/{color}]" - - files_display = "\n".join(truncate_path(f, max_width=55) for f in sorted(files)) - - table.add_row(label, count_str, files_display) - - add_smell_row("Orphan Comments", orphan_files) - add_smell_row("Untracked TODOs", todo_files) - add_smell_row("Inline Imports", import_files) - add_smell_row(".get() w/ Defaults", get_files) - add_smell_row("hasattr/getattr", getattr_files) - add_smell_row("Non-empty __init__", init_files) - add_smell_row("Test Skips", test_skip_files) - add_smell_row("Swallowed Exceptions", swallowed_exception_files) - add_smell_row("Type Ignores", type_ignore_files) - add_smell_row("Dynamic Execution", dynamic_execution_files) + for category in [SmellCategory.GENERAL, SmellCategory.PYTHON]: + smell_data = smell_data_by_category[category] + if not smell_data: + continue + category_label = "General" if category == SmellCategory.GENERAL else "Python" + table.add_row(f"[bold dim]{category_label}[/bold dim]", "", "") + for label, files in smell_data: + count_str = f"[red]{len(files)}[/red]" + files_display = "\n".join(truncate_path(f, max_width=55) for f in sorted(files)) + table.add_row(f" {label}", count_str, files_display) console.print(table) @@ -1335,24 +1284,13 @@ def display_baseline_comparison( baseline_table.add_column("Std Dev", justify="right") baseline_table.add_column("Trend", justify="right") - baseline_table.add_row( - "Cyclomatic Complexity", - f"{baseline.cc_delta_stats.mean:+.1f}", - f"±{baseline.cc_delta_stats.std_dev:.1f}", - _format_trend(baseline.cc_delta_stats.trend_coefficient, lower_is_better=True), - ) - baseline_table.add_row( - "Halstead Effort", - f"{baseline.effort_delta_stats.mean:+.2f}", - f"±{baseline.effort_delta_stats.std_dev:.2f}", - _format_trend(baseline.effort_delta_stats.trend_coefficient, lower_is_better=True), - ) - baseline_table.add_row( - "Maintainability Index", - f"{baseline.mi_delta_stats.mean:+.2f}", - f"±{baseline.mi_delta_stats.std_dev:.2f}", - _format_trend(baseline.mi_delta_stats.trend_coefficient, lower_is_better=False), - ) + if baseline.qpe_stats: + baseline_table.add_row( + "QPE (GRPO)", + f"{baseline.qpe_stats.mean:+.4f}", + f"±{baseline.qpe_stats.std_dev:.4f}", + _format_trend(baseline.qpe_stats.trend_coefficient, lower_is_better=False), + ) console.print(baseline_table) @@ -1364,28 +1302,12 @@ def display_baseline_comparison( impact_table.add_column("Z-Score", justify="right") impact_table.add_column("Assessment", style="dim") - cc_color = "green" if assessment.cc_z_score < 0 else "red" if assessment.cc_z_score > 0 else "yellow" - impact_table.add_row( - "Cyclomatic Complexity", - f"[{cc_color}]{assessment.cc_delta:+.2f}[/{cc_color}]", - f"{assessment.cc_z_score:+.2f}", - _interpret_z_score(-assessment.cc_z_score), - ) - - effort_color = "green" if assessment.effort_z_score < 0 else "red" if assessment.effort_z_score > 0 else "yellow" - impact_table.add_row( - "Average Effort", - f"[{effort_color}]{assessment.effort_delta:+.2f}[/{effort_color}]", - f"{assessment.effort_z_score:+.2f}", - _interpret_z_score(-assessment.effort_z_score), - ) - - mi_color = "green" if assessment.mi_z_score > 0 else "red" if assessment.mi_z_score < 0 else "yellow" + qpe_color = "green" if assessment.qpe_z_score > 0 else "red" if assessment.qpe_z_score < 0 else "yellow" impact_table.add_row( - "Maintainability Index", - f"[{mi_color}]{assessment.mi_delta:+.2f}[/{mi_color}]", - f"{assessment.mi_z_score:+.2f}", - _interpret_z_score(assessment.mi_z_score), + "QPE (GRPO)", + f"[{qpe_color}]{assessment.qpe_delta:+.4f}[/{qpe_color}]", + f"{assessment.qpe_z_score:+.2f}", + _interpret_z_score(assessment.qpe_z_score), ) console.print(impact_table) @@ -1416,21 +1338,9 @@ def display_baseline_comparison_compact( lines = [] lines.append(f"Repository Baseline ({baseline.total_commits_analyzed} commits):") - cc_sign = "↓" if assessment.cc_z_score < 0 else "↑" if assessment.cc_z_score > 0 else "→" - cc_quality = "good" if assessment.cc_z_score < 0 else "above avg" if assessment.cc_z_score > 0 else "avg" - lines.append(f" CC: {assessment.cc_delta:+.2f} (Z: {assessment.cc_z_score:+.2f} {cc_sign} {cc_quality})") - - effort_sign = "↓" if assessment.effort_z_score < 0 else "↑" if assessment.effort_z_score > 0 else "→" - effort_quality = ( - "good" if assessment.effort_z_score < 0 else "above avg" if assessment.effort_z_score > 0 else "avg" - ) - lines.append( - f" Effort: {assessment.effort_delta:+.2f} (Z: {assessment.effort_z_score:+.2f} {effort_sign} {effort_quality})" - ) - - mi_sign = "↑" if assessment.mi_z_score > 0 else "↓" if assessment.mi_z_score < 0 else "→" - mi_quality = "good" if assessment.mi_z_score > 0 else "below avg" if assessment.mi_z_score < 0 else "avg" - lines.append(f" MI: {assessment.mi_delta:+.2f} (Z: {assessment.mi_z_score:+.2f} {mi_sign} {mi_quality})") + qpe_sign = "↑" if assessment.qpe_z_score > 0 else "↓" if assessment.qpe_z_score < 0 else "→" + qpe_quality = "good" if assessment.qpe_z_score > 0 else "below avg" if assessment.qpe_z_score < 0 else "avg" + lines.append(f" QPE (GRPO): {assessment.qpe_delta:+.4f} (Z: {assessment.qpe_z_score:+.2f} {qpe_sign} {qpe_quality})") category_display = assessment.impact_category.value.replace("_", " ").upper() lines.append(f"Session Impact: {category_display} ({assessment.impact_score:+.2f})") @@ -1451,8 +1361,11 @@ def display_qpe_score( console.print("\n[bold]Quality-Per-Effort Score[/bold]") + # Show both QPE metrics qpe_color = "green" if qpe_score.qpe > 0.05 else "yellow" if qpe_score.qpe > 0.02 else "red" - console.print(f" [bold]QPE:[/bold] [{qpe_color}]{qpe_score.qpe:.4f}[/{qpe_color}]") + qual_color = "green" if qpe_score.qpe_absolute > 0.6 else "yellow" if qpe_score.qpe_absolute > 0.4 else "red" + console.print(f" [bold]QPE (GRPO):[/bold] [{qpe_color}]{qpe_score.qpe:.4f}[/{qpe_color}] [dim]effort-normalized for rollout comparison[/dim]") + console.print(f" [bold]Quality:[/bold] [{qual_color}]{qpe_score.qpe_absolute:.4f}[/{qual_color}] [dim]absolute for cross-project/temporal[/dim]") component_table = Table(title="QPE Components", show_header=True) component_table.add_column("Component", style="cyan") @@ -1469,13 +1382,13 @@ def display_qpe_score( component_table.add_row( "Smell Penalty", f"[{smell_color}]{qpe_score.smell_penalty:.3f}[/{smell_color}]", - "Weighted code smell deduction (0-0.5)", + "Sigmoid-saturated deduction (0-0.9)", ) component_table.add_row( "Adjusted Quality", f"{qpe_score.adjusted_quality:.3f}", - "MI × (1 - smell_penalty)", + "MI × (1 - smell_penalty) + bonuses", ) component_table.add_row( @@ -1491,13 +1404,23 @@ def display_qpe_score( smell_table.add_column("Smell", style="cyan") smell_table.add_column("Count", justify="right") - for smell_name, count in sorted(qpe_score.smell_counts.items(), key=lambda x: -x[1]): - if count > 0: - smell_table.add_row(smell_name.replace("_", " ").title(), str(count)) + for category in [SmellCategory.GENERAL, SmellCategory.PYTHON]: + category_smells = [ + (name, qpe_score.smell_counts[name]) + for name in qpe_score.smell_counts + if SMELL_REGISTRY.get(name) and SMELL_REGISTRY[name].category == category + ] + category_smells = [(n, c) for n, c in category_smells if c > 0] + if not category_smells: + continue + category_label = "General" if category == SmellCategory.GENERAL else "Python" + smell_table.add_row(f"[bold dim]{category_label}[/bold dim]", "") + for smell_name, count in sorted(category_smells, key=lambda x: -x[1]): + smell_table.add_row(f" {get_smell_label(smell_name)}", str(count)) console.print(smell_table) - console.print("\n[dim]Higher QPE = better quality per unit effort[/dim]") + console.print("\n[dim]Higher QPE (GRPO) = better quality per effort | Higher Quality = better absolute quality[/dim]") def display_cross_project_comparison(comparison: "CrossProjectComparison") -> None: @@ -1538,7 +1461,7 @@ def display_cross_project_comparison(comparison: "CrossProjectComparison") -> No ) console.print(table) - console.print("\n[dim]Higher QPE = better quality per unit effort[/dim]") + console.print("\n[dim]Higher QPE = better quality per effort | Higher Quality = better absolute quality[/dim]") def display_leaderboard(entries: list) -> None: @@ -1588,4 +1511,4 @@ def display_leaderboard(entries: list) -> None: ) console.print(table) - console.print("\n[dim]Higher QPE = better quality per unit effort. Use --append to add projects.[/dim]") + console.print("\n[dim]Higher QPE = better quality per effort. Use --append to add projects.[/dim]") diff --git a/src/slopometry/solo/cli/commands.py b/src/slopometry/solo/cli/commands.py index e05fb4a..960c5eb 100644 --- a/src/slopometry/solo/cli/commands.py +++ b/src/slopometry/solo/cli/commands.py @@ -5,14 +5,14 @@ from typing import TYPE_CHECKING import click -from rich.console import Console + +from slopometry.display.console import console if TYPE_CHECKING: from slopometry.core.models import ImpactAssessment, RepoBaseline, SessionStatistics # Imports moved inside functions to optimize startup time -console = Console() logger = logging.getLogger(__name__) @@ -121,7 +121,8 @@ def list_sessions(limit: int) -> None: @click.argument("session_id", shell_complete=complete_session_id) @click.option("--smell-details", is_flag=True, help="Show files affected by each code smell") @click.option("--file-details", is_flag=True, help="Show full file lists in delta sections") -def show(session_id: str, smell_details: bool, file_details: bool) -> None: +@click.option("--pager/--no-pager", default=True, help="Use pager for long output (like less)") +def show(session_id: str, smell_details: bool, file_details: bool, pager: bool) -> None: """Show detailed statistics for a session.""" import time @@ -138,24 +139,33 @@ def show(session_id: str, smell_details: bool, file_details: bool) -> None: return baseline, assessment = _compute_session_baseline(stats) - display_session_summary( - stats, - session_id, - baseline, - assessment, - show_smell_files=smell_details, - show_file_details=file_details, - ) - elapsed = time.perf_counter() - start_time - if elapsed > 5: - console.print(f"\n[dim]Analysis completed in {elapsed:.1f}s[/dim]") + def _display() -> None: + display_session_summary( + stats, + session_id, + baseline, + assessment, + show_smell_files=smell_details, + show_file_details=file_details, + ) + + elapsed = time.perf_counter() - start_time + if elapsed > 5: + console.print(f"\n[dim]Analysis completed in {elapsed:.1f}s[/dim]") + + if pager: + with console.pager(styles=True): + _display() + else: + _display() @click.command() @click.option("--smell-details", is_flag=True, help="Show files affected by each code smell") @click.option("--file-details", is_flag=True, help="Show full file lists in delta sections") -def latest(smell_details: bool, file_details: bool) -> None: +@click.option("--pager/--no-pager", default=True, help="Use pager for long output (like less)") +def latest(smell_details: bool, file_details: bool, pager: bool) -> None: """Show detailed statistics for the most recent session.""" import time @@ -177,6 +187,15 @@ def latest(smell_details: bool, file_details: bool) -> None: if stats.complexity_metrics and stats.working_directory: working_dir = Path(stats.working_directory) if working_dir.exists(): + from slopometry.core.project_guard import MultiProjectError, guard_single_project + + try: + guard_single_project(working_dir) + except MultiProjectError as e: + console.print(f"[red]Error:[/red] {e}") + return + + # Always fetch fresh coverage from coverage.xml (don't use stale cached value) try: from slopometry.core.coverage_analyzer import CoverageAnalyzer @@ -186,22 +205,45 @@ def latest(smell_details: bool, file_details: bool) -> None: if coverage_result.coverage_available: stats.complexity_metrics.test_coverage_percent = coverage_result.total_coverage_percent stats.complexity_metrics.test_coverage_source = coverage_result.source_file + + try: + from slopometry.core.code_quality_cache import CodeQualityCacheManager + from slopometry.core.database import EventDatabase + + db = EventDatabase() + with db._get_db_connection() as conn: + cache_manager = CodeQualityCacheManager(conn) + cache_manager.update_cached_coverage( + most_recent, + coverage_result.total_coverage_percent, + coverage_result.source_file or "", + ) + except Exception as cache_err: + logger.debug(f"Failed to cache coverage result: {cache_err}") except Exception as e: logger.debug(f"Coverage analysis failed (optional): {e}") baseline, assessment = _compute_session_baseline(stats) - display_session_summary( - stats, - most_recent, - baseline, - assessment, - show_smell_files=smell_details, - show_file_details=file_details, - ) - elapsed = time.perf_counter() - start_time - if elapsed > 5: - console.print(f"\n[dim]Analysis completed in {elapsed:.1f}s[/dim]") + def _display() -> None: + display_session_summary( + stats, + most_recent, + baseline, + assessment, + show_smell_files=smell_details, + show_file_details=file_details, + ) + + elapsed = time.perf_counter() - start_time + if elapsed > 5: + console.print(f"\n[dim]Analysis completed in {elapsed:.1f}s[/dim]") + + if pager: + with console.pager(styles=True): + _display() + else: + _display() def _compute_session_baseline( diff --git a/src/slopometry/solo/services/hook_service.py b/src/slopometry/solo/services/hook_service.py index 5e2d4ab..a1baa7c 100644 --- a/src/slopometry/solo/services/hook_service.py +++ b/src/slopometry/solo/services/hook_service.py @@ -1,9 +1,12 @@ """Hook management service for Claude Code integration.""" import json +import logging from datetime import datetime from pathlib import Path +logger = logging.getLogger(__name__) + from pydantic import BaseModel, Field from slopometry.core.settings import get_default_config_dir, get_default_data_dir, settings @@ -115,7 +118,7 @@ def load(cls, path: Path) -> "ClaudeSettings": def save(self, path: Path) -> None: """Save settings to file.""" path.parent.mkdir(exist_ok=True) - path.write_text(self.model_dump_json(indent=2, exclude_defaults=True)) + path.write_text(self.model_dump_json(indent=2, exclude_none=True)) class HookService: @@ -149,7 +152,8 @@ def _update_gitignore(self, working_dir: Path) -> tuple[bool, str | None]: for line in content.splitlines(): if line.strip().rstrip("/") == entry.rstrip("/"): return False, None - except OSError: + except OSError as e: + logger.debug(f"Failed to read .gitignore at {gitignore_path}: {e}") return False, None try: diff --git a/src/slopometry/summoner/cli/commands.py b/src/slopometry/summoner/cli/commands.py index 5eb59d1..7a3accc 100644 --- a/src/slopometry/summoner/cli/commands.py +++ b/src/slopometry/summoner/cli/commands.py @@ -8,11 +8,11 @@ import click from click.shell_completion import CompletionItem -from rich.console import Console + +from slopometry.display.console import console # Imports moved inside functions to optimize startup time -console = Console() logger = logging.getLogger(__name__) @@ -295,11 +295,13 @@ def _show_commit_range_baseline_comparison(repo_path: Path, start: str, end: str is_flag=True, help="Show detailed file lists (blind spots)", ) +@click.option("--pager/--no-pager", default=True, help="Use pager for long output (like less)") def current_impact( repo_path: Path | None, recompute_baseline: bool, max_workers: int, file_details: bool, + pager: bool, ) -> None: """Analyze impact of uncommitted changes against repository baseline. @@ -319,6 +321,14 @@ def current_impact( if repo_path is None: repo_path = Path.cwd() + from slopometry.core.project_guard import MultiProjectError, guard_single_project + + try: + guard_single_project(repo_path) + except MultiProjectError as e: + console.print(f"[red]Error:[/red] {e}") + return + from slopometry.core.language_guard import check_language_support from slopometry.core.models import ProjectLanguage from slopometry.core.working_tree_extractor import WorkingTreeExtractor @@ -334,22 +344,22 @@ def current_impact( return extractor = WorkingTreeExtractor(repo_path) + baseline_service = BaselineService() + current_impact_service = CurrentImpactService() changed_files = extractor.get_changed_python_files() + analyze_previous = False + if not changed_files: if not extractor.has_uncommitted_changes(): - console.print("[yellow]No uncommitted changes found.[/yellow]") - console.print("[dim]Make some changes and try again.[/dim]") + console.print("[yellow]No uncommitted changes found. Falling back to analyzing previous commit.[/yellow]") + analyze_previous = True else: console.print("[yellow]No uncommitted Python files found.[/yellow]") console.print("[dim]Modify some Python files and try again.[/dim]") - return + return - console.print("[bold]Analyzing uncommitted changes impact[/bold]") console.print(f"Repository: {repo_path}") - console.print(f"Changed Python files: {len(changed_files)}") - - baseline_service = BaselineService() console.print("\n[yellow]Computing repository baseline...[/yellow]") baseline = baseline_service.get_or_compute_baseline( @@ -363,32 +373,47 @@ def current_impact( console.print(f"[green]✓ Baseline computed from {baseline.total_commits_analyzed} commits[/green]") - console.print("\n[yellow]Analyzing uncommitted changes...[/yellow]") - - current_impact_service = CurrentImpactService() try: - analysis = current_impact_service.analyze_uncommitted_changes(repo_path, baseline) + if analyze_previous: + console.print("\n[yellow]Analyzing previous commit...[/yellow]") + analysis = current_impact_service.analyze_previous_commit(repo_path, baseline) - if not analysis: - console.print("[red]Failed to analyze uncommitted changes.[/red]") - return + if not analysis: + console.print("[red]Failed to analyze previous commit.[/red]") + console.print("[dim]Ensure the repository has at least 2 commits with Python changes.[/dim]") + return - try: - from slopometry.core.coverage_analyzer import CoverageAnalyzer + console.print(f"Changed Python files in previous commit: {len(analysis.changed_files)}") + else: + console.print("[bold]Analyzing uncommitted changes impact[/bold]") + console.print(f"Changed Python files: {len(changed_files)}") + console.print("\n[yellow]Analyzing uncommitted changes...[/yellow]") + analysis = current_impact_service.analyze_uncommitted_changes(repo_path, baseline) - coverage_analyzer = CoverageAnalyzer(repo_path) - coverage_result = coverage_analyzer.analyze_coverage() + if not analysis: + console.print("[red]Failed to analyze uncommitted changes.[/red]") + return - if coverage_result.coverage_available: - analysis.current_metrics.test_coverage_percent = coverage_result.total_coverage_percent - analysis.current_metrics.test_coverage_source = coverage_result.source_file - except Exception as e: - logger.debug(f"Coverage analysis failed (optional): {e}") + try: + from slopometry.core.coverage_analyzer import CoverageAnalyzer + + coverage_analyzer = CoverageAnalyzer(repo_path) + coverage_result = coverage_analyzer.analyze_coverage() + + if coverage_result.coverage_available: + analysis.current_metrics.test_coverage_percent = coverage_result.total_coverage_percent + analysis.current_metrics.test_coverage_source = coverage_result.source_file + except Exception as e: + logger.debug(f"Coverage analysis failed (optional): {e}") - display_current_impact_analysis(analysis, show_file_details=file_details) + if pager: + with console.pager(styles=True): + display_current_impact_analysis(analysis, show_file_details=file_details) + else: + display_current_impact_analysis(analysis, show_file_details=file_details) except Exception as e: - console.print(f"[red]Failed to analyze uncommitted changes: {e}[/red]") + console.print(f"[red]Failed to analyze changes: {e}[/red]") sys.exit(1) @@ -963,6 +988,17 @@ def qpe(repo_path: Path | None, output_json: bool) -> None: if repo_path is None: repo_path = Path.cwd() + from slopometry.core.project_guard import MultiProjectError, guard_single_project + + try: + guard_single_project(repo_path) + except MultiProjectError as e: + if output_json: + print(f'{{"error": "{e.project_count} git repositories found. Run from within a specific project."}}') + else: + console.print(f"[red]Error:[/red] {e}") + return + from slopometry.core.language_guard import check_language_support from slopometry.core.models import ProjectLanguage diff --git a/src/slopometry/summoner/services/baseline_service.py b/src/slopometry/summoner/services/baseline_service.py index 17f201e..d81dd60 100644 --- a/src/slopometry/summoner/services/baseline_service.py +++ b/src/slopometry/summoner/services/baseline_service.py @@ -17,6 +17,7 @@ RepoBaseline, ) from slopometry.core.settings import settings +from slopometry.summoner.services.qpe_calculator import QPECalculator logger = logging.getLogger(__name__) @@ -36,6 +37,7 @@ class CommitDelta: cc_delta: float effort_delta: float mi_delta: float + qpe_delta: float def _compute_single_delta_task(repo_path: Path, parent_sha: str, child_sha: str) -> CommitDelta | None: @@ -44,13 +46,14 @@ def _compute_single_delta_task(repo_path: Path, parent_sha: str, child_sha: str) NOTE: Must be at module level because ProcessPoolExecutor requires picklable callables. """ git_tracker = GitTracker(repo_path) + qpe_calculator = QPECalculator() parent_dir = None child_dir = None try: changed_files = git_tracker.get_changed_python_files(parent_sha, child_sha) if not changed_files: - return CommitDelta(cc_delta=0.0, effort_delta=0.0, mi_delta=0.0) + return CommitDelta(cc_delta=0.0, effort_delta=0.0, mi_delta=0.0, qpe_delta=0.0) parent_dir = git_tracker.extract_specific_files_from_commit(parent_sha, changed_files) child_dir = git_tracker.extract_specific_files_from_commit(child_sha, changed_files) @@ -66,15 +69,18 @@ def _compute_single_delta_task(repo_path: Path, parent_sha: str, child_sha: str) parent_cc = parent_metrics.total_complexity if parent_metrics else 0 parent_effort = parent_metrics.total_effort if parent_metrics else 0.0 parent_mi = parent_metrics.total_mi if parent_metrics else 0.0 + parent_qpe = qpe_calculator.calculate_qpe(parent_metrics).qpe if parent_metrics else 0.0 child_cc = child_metrics.total_complexity if child_metrics else 0 child_effort = child_metrics.total_effort if child_metrics else 0.0 child_mi = child_metrics.total_mi if child_metrics else 0.0 + child_qpe = qpe_calculator.calculate_qpe(child_metrics).qpe if child_metrics else 0.0 return CommitDelta( cc_delta=child_cc - parent_cc, effort_delta=child_effort - parent_effort, mi_delta=child_mi - parent_mi, + qpe_delta=child_qpe - parent_qpe, ) except GitOperationError as e: @@ -116,7 +122,7 @@ def get_or_compute_baseline( if not recompute: cached = self.db.get_cached_baseline(str(repo_path), head_sha) - if cached: + if cached and cached.qpe_stats is not None: return cached baseline = self.compute_full_baseline(repo_path, max_workers=max_workers) @@ -157,13 +163,16 @@ def compute_full_baseline(self, repo_path: Path, max_workers: int = 4) -> RepoBa cc_deltas = [d.cc_delta for d in deltas] effort_deltas = [d.effort_delta for d in deltas] mi_deltas = [d.mi_delta for d in deltas] + qpe_deltas = [d.qpe_delta for d in deltas] - # commits are in reverse chronological order (newest first, oldest last) newest_commit_date = commits[0].timestamp oldest_commit_date = commits[-1].timestamp oldest_commit_tokens = self._get_commit_token_count(repo_path, commits[-1].sha, analyzer) + qpe_calculator = QPECalculator() + current_qpe = qpe_calculator.calculate_qpe(current_metrics) + return RepoBaseline( repository_path=str(repo_path), computed_at=datetime.now(), @@ -176,6 +185,8 @@ def compute_full_baseline(self, repo_path: Path, max_workers: int = 4) -> RepoBa oldest_commit_date=oldest_commit_date, newest_commit_date=newest_commit_date, oldest_commit_tokens=oldest_commit_tokens, + qpe_stats=self._compute_stats("qpe_delta", qpe_deltas), + current_qpe=current_qpe, ) def _get_all_commits(self, repo_path: Path) -> list[CommitInfo]: diff --git a/src/slopometry/summoner/services/current_impact_service.py b/src/slopometry/summoner/services/current_impact_service.py index bd5c997..5708ed3 100644 --- a/src/slopometry/summoner/services/current_impact_service.py +++ b/src/slopometry/summoner/services/current_impact_service.py @@ -7,6 +7,7 @@ from slopometry.core.complexity_analyzer import ComplexityAnalyzer from slopometry.core.models import ( + AnalysisSource, ComplexityDelta, CurrentChangesAnalysis, ExtendedComplexityMetrics, @@ -15,6 +16,7 @@ ) from slopometry.core.working_tree_extractor import WorkingTreeExtractor from slopometry.summoner.services.impact_calculator import ImpactCalculator +from slopometry.summoner.services.qpe_calculator import QPECalculator logger = logging.getLogger(__name__) @@ -80,8 +82,8 @@ def analyze_uncommitted_changes( for file_path in changed_files: if file_path in cov_result.file_coverage: filtered_coverage[file_path] = cov_result.file_coverage[file_path] - except Exception: - pass + except Exception as e: + logger.debug(f"Coverage analysis failed (optional feature): {e}") blind_spot_tokens = 0 changed_files_tokens = 0 @@ -116,6 +118,118 @@ def get_token_count(path_str: str) -> int: galen_metrics=galen_metrics, ) + def analyze_previous_commit( + self, + repo_path: Path, + baseline: RepoBaseline, + ) -> CurrentChangesAnalysis | None: + """Analyze the previous commit (HEAD) against its parent (HEAD~1). + + Used as fallback when there are no uncommitted changes. + + Args: + repo_path: Path to the repository + baseline: Pre-computed repository baseline + + Returns: + CurrentChangesAnalysis or None if analysis fails + """ + from slopometry.core.git_tracker import GitOperationError, GitTracker + + repo_path = repo_path.resolve() + git_tracker = GitTracker(repo_path) + analyzer = ComplexityAnalyzer(working_directory=repo_path) + + if not git_tracker.has_previous_commit(): + return None + + head_sha = git_tracker._get_current_commit_sha() + if not head_sha: + return None + + import subprocess + + try: + result = subprocess.run( + ["git", "rev-parse", "HEAD~1"], + cwd=repo_path, + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode != 0: + logger.debug(f"git rev-parse HEAD~1 failed: {result.stderr.strip()}") + return None + parent_sha = result.stdout.strip() + except Exception as e: + logger.debug(f"Failed to get parent commit SHA: {e}") + return None + + try: + changed_files = git_tracker.get_changed_python_files(parent_sha, head_sha) + except GitOperationError as e: + logger.debug(f"Failed to get changed files between {parent_sha[:8]} and {head_sha[:8]}: {e}") + return None + + if not changed_files: + return None + + try: + with git_tracker.extract_files_from_commit_ctx(parent_sha) as parent_dir: + with git_tracker.extract_files_from_commit_ctx(head_sha) as head_dir: + if not parent_dir or not head_dir: + logger.debug( + f"Failed to extract files from commits: parent_dir={parent_dir}, head_dir={head_dir}" + ) + return None + + parent_metrics = analyzer.analyze_extended_complexity(parent_dir) + head_metrics = analyzer.analyze_extended_complexity(head_dir) + except GitOperationError as e: + logger.debug(f"Failed to extract or analyze commits {parent_sha[:8]}..{head_sha[:8]}: {e}") + return None + + # Use parent as baseline, HEAD as current + current_delta = self._compute_delta(parent_metrics, head_metrics) + assessment = self.impact_calculator.calculate_impact(current_delta, baseline) + + from slopometry.core.context_coverage_analyzer import ContextCoverageAnalyzer + + coverage_analyzer = ContextCoverageAnalyzer(repo_path) + blind_spots = coverage_analyzer.get_affected_dependents(set(changed_files)) + + blind_spot_tokens = 0 + changed_files_tokens = 0 + + for file_path in changed_files: + changed_files_tokens += head_metrics.files_by_token_count.get(file_path, 0) + + for file_path in blind_spots: + blind_spot_tokens += head_metrics.files_by_token_count.get(file_path, 0) + + complete_picture_context_size = changed_files_tokens + blind_spot_tokens + + galen_metrics = self._calculate_galen_metrics(baseline, head_metrics) + + return CurrentChangesAnalysis( + repository_path=str(repo_path), + analysis_timestamp=datetime.now(), + source=AnalysisSource.PREVIOUS_COMMIT, + analyzed_commit_sha=head_sha[:8], + base_commit_sha=parent_sha[:8], + changed_files=changed_files, + current_metrics=head_metrics, + baseline_metrics=parent_metrics, + assessment=assessment, + baseline=baseline, + blind_spots=blind_spots, + filtered_coverage=None, # Coverage not meaningful for committed changes + blind_spot_tokens=blind_spot_tokens, + changed_files_tokens=changed_files_tokens, + complete_picture_context_size=complete_picture_context_size, + galen_metrics=galen_metrics, + ) + def _calculate_galen_metrics( self, baseline: RepoBaseline, @@ -150,6 +264,10 @@ def _compute_delta( current_metrics: ExtendedComplexityMetrics, ) -> ComplexityDelta: """Compute complexity delta between baseline and current metrics.""" + qpe_calculator = QPECalculator() + baseline_qpe = qpe_calculator.calculate_qpe(baseline_metrics).qpe + current_qpe = qpe_calculator.calculate_qpe(current_metrics).qpe + return ComplexityDelta( total_complexity_change=(current_metrics.total_complexity - baseline_metrics.total_complexity), avg_complexity_change=(current_metrics.average_complexity - baseline_metrics.average_complexity), @@ -161,4 +279,5 @@ def _compute_delta( total_mi_change=current_metrics.total_mi - baseline_metrics.total_mi, avg_mi_change=current_metrics.average_mi - baseline_metrics.average_mi, net_files_change=(current_metrics.total_files_analyzed - baseline_metrics.total_files_analyzed), + qpe_change=current_qpe - baseline_qpe, ) diff --git a/src/slopometry/summoner/services/experiment_orchestrator.py b/src/slopometry/summoner/services/experiment_orchestrator.py index 673d227..7ffb7ce 100644 --- a/src/slopometry/summoner/services/experiment_orchestrator.py +++ b/src/slopometry/summoner/services/experiment_orchestrator.py @@ -7,7 +7,6 @@ from datetime import datetime from pathlib import Path -from rich.console import Console from rich.table import Table from slopometry.core.complexity_analyzer import ComplexityAnalyzer @@ -15,6 +14,7 @@ from slopometry.core.database import EventDatabase from slopometry.core.git_tracker import GitOperationError, GitTracker from slopometry.core.models import ExperimentProgress, ExperimentRun, ExperimentStatus, ExtendedComplexityMetrics +from slopometry.display.console import console from slopometry.summoner.services.cli_calculator import CLICalculator from slopometry.summoner.services.worktree_manager import WorktreeManager @@ -187,8 +187,6 @@ def analyze_commit_chain(self, base_commit: str, head_commit: str) -> None: base_commit: Starting commit (e.g., HEAD~10) head_commit: Ending commit (e.g., HEAD) """ - console = Console() - result = subprocess.run( ["git", "rev-list", "--reverse", f"{base_commit}..{head_commit}"], cwd=self.repo_path, diff --git a/src/slopometry/summoner/services/impact_calculator.py b/src/slopometry/summoner/services/impact_calculator.py index c50bd1e..ef7776b 100644 --- a/src/slopometry/summoner/services/impact_calculator.py +++ b/src/slopometry/summoner/services/impact_calculator.py @@ -64,8 +64,17 @@ def calculate_impact( baseline.mi_delta_stats.std_dev, ) + qpe_delta = staged_delta.qpe_change + qpe_z = 0.0 + if baseline.qpe_stats: + qpe_z = self._safe_z_score( + qpe_delta, + baseline.qpe_stats.mean, + baseline.qpe_stats.std_dev, + ) + # NOTE: Normalize z-score directions for impact scoring: - # CC/Effort: negate (lower=better), MI: keep (higher=better) + # CC/Effort: negate (lower=better), MI/QPE: keep (higher=better) normalized_cc = -cc_z normalized_effort = -effort_z normalized_mi = mi_z @@ -87,6 +96,8 @@ def calculate_impact( cc_delta=cc_delta, effort_delta=effort_delta, mi_delta=mi_delta, + qpe_delta=qpe_delta, + qpe_z_score=qpe_z, ) def _safe_z_score(self, value: float, mean: float, std: float) -> float: diff --git a/src/slopometry/summoner/services/qpe_calculator.py b/src/slopometry/summoner/services/qpe_calculator.py index 4037248..ace874b 100644 --- a/src/slopometry/summoner/services/qpe_calculator.py +++ b/src/slopometry/summoner/services/qpe_calculator.py @@ -1,14 +1,14 @@ """Quality-Per-Effort (QPE) calculator for principled code quality comparison. -QPE provides a single metric for: -1. GRPO rollout comparison (same-spec implementations) -2. Cross-project comparison +Provides two metrics for different use cases: +- qpe: Effort-normalized for GRPO rollout comparison (same spec) +- qpe_absolute: Raw quality for cross-project/temporal comparison Key properties: - Uses MI as sole quality signal (no double-counting with CC/Volume) -- Normalizes by Halstead Effort for fair comparison -- Includes code smell penalties with explicit weights -- Bounded output via tanh for stable RL training +- Sigmoid-saturated smell penalty (configurable steepness) +- All smells weighted equally regardless of file complexity +- Bounded GRPO advantage via tanh for stable RL training """ import math @@ -21,37 +21,36 @@ ProjectQPEResult, QPEScore, ) +from slopometry.core.settings import settings class QPECalculator: """Quality-Per-Effort calculator for principled comparison.""" - # Smell weights with explicit rationale - # Sum to ~0.7 so maximum penalty (all smells present) approaches 0.5 cap - SMELL_WEIGHTS: dict[str, float] = { - "hasattr_getattr": 0.10, # Indicates missing domain models - "swallowed_exception": 0.15, # Can hide real bugs - "type_ignore": 0.08, # Type system bypass - "dynamic_execution": 0.12, # Security/maintainability risk - "test_skip": 0.10, # Missing coverage - "dict_get_with_default": 0.05, # Minor modeling gap - "inline_import": 0.03, # Style issue - "orphan_comment": 0.02, # Documentation noise - "untracked_todo": 0.02, # Debt tracking - "nonempty_init": 0.03, # Structural issue - } - def calculate_qpe(self, metrics: ExtendedComplexityMetrics) -> QPEScore: """Calculate Quality-Per-Effort score. Formula: - QPE = adjusted_quality / effort_factor + qpe = adjusted_quality / effort_factor (for GRPO) + qpe_absolute = adjusted_quality (for cross-project/temporal) Where: - adjusted_quality = mi_normalized * (1 - smell_penalty) + adjusted_quality = mi_normalized * (1 - smell_penalty) + bonuses mi_normalized = average_mi / 100.0 - smell_penalty = min(weighted_smell_sum / files_analyzed, 0.5) + smell_penalty = 0.9 * (1 - exp(-smell_penalty_raw * steepness)) + smell_penalty_raw = weighted_smell_sum / effective_files effort_factor = log(total_halstead_effort + 1) + bonuses = test_bonus + type_bonus + docstring_bonus + + Smell penalty uses: + - Effective files (files with min LOC) to prevent gaming via tiny files + - No effort multiplier: all smells penalize equally + - Sigmoid saturation instead of hard cap + + Bonuses (positive signals): + - Test coverage bonus when >= threshold + - Type hint coverage bonus when >= threshold + - Docstring coverage bonus when >= threshold Args: metrics: Extended complexity metrics for the codebase @@ -59,40 +58,59 @@ def calculate_qpe(self, metrics: ExtendedComplexityMetrics) -> QPEScore: Returns: QPEScore with component breakdown """ - # 1. Quality signal: MI (0-100) normalized to 0-1 mi_normalized = metrics.average_mi / 100.0 - # 2. Collect smell counts and compute weighted penalty - smell_counts: dict[str, int] = { - "hasattr_getattr": metrics.hasattr_getattr_count, - "swallowed_exception": metrics.swallowed_exception_count, - "type_ignore": metrics.type_ignore_count, - "dynamic_execution": metrics.dynamic_execution_count, - "test_skip": metrics.test_skip_count, - "dict_get_with_default": metrics.dict_get_with_default_count, - "inline_import": metrics.inline_import_count, - "orphan_comment": metrics.orphan_comment_count, - "untracked_todo": metrics.untracked_todo_count, - "nonempty_init": metrics.nonempty_init_count, - } - - weighted_smell_sum = sum(smell_counts[smell_name] * weight for smell_name, weight in self.SMELL_WEIGHTS.items()) - - # Normalize by file count and cap at 0.5 - files_analyzed = max(metrics.total_files_analyzed, 1) - smell_penalty = min(weighted_smell_sum / files_analyzed, 0.5) - - # 3. Adjusted quality - adjusted_quality = mi_normalized * (1 - smell_penalty) - - # 4. Effort normalization using log for diminishing returns + smell_counts: dict[str, int] = {} + weighted_smell_sum = 0.0 + + for smell in metrics.get_smells(): + smell_counts[smell.name] = smell.count + weighted_smell_sum += smell.count * smell.weight + + # Use files_by_loc for anti-gaming file filtering, fallback to total_files + if metrics.files_by_loc: + effective_files = sum(1 for loc in metrics.files_by_loc.values() if loc >= settings.qpe_min_loc_per_file) + else: + effective_files = metrics.total_files_analyzed + + total_files = max(effective_files, 1) + smell_penalty_raw = weighted_smell_sum / total_files + + # Sigmoid saturation with configurable steepness (approaches 0.9 asymptotically) + smell_penalty = 0.9 * (1 - math.exp(-smell_penalty_raw * settings.qpe_sigmoid_steepness)) + + # Positive bonuses (configurable thresholds and amounts) + test_bonus = ( + settings.qpe_test_coverage_bonus + if (metrics.test_coverage_percent or 0) >= settings.qpe_test_coverage_threshold + else 0.0 + ) + type_bonus = ( + settings.qpe_type_coverage_bonus + if metrics.type_hint_coverage >= settings.qpe_type_coverage_threshold + else 0.0 + ) + docstring_bonus = ( + settings.qpe_docstring_coverage_bonus + if metrics.docstring_coverage >= settings.qpe_docstring_coverage_threshold + else 0.0 + ) + total_bonus = test_bonus + type_bonus + docstring_bonus + + adjusted_quality = mi_normalized * (1 - smell_penalty) + total_bonus + + # Effort normalization using log for diminishing returns effort_factor = math.log(metrics.total_effort + 1) - # 5. QPE: quality per log-effort (higher = better) + # qpe: effort-normalized for GRPO rollouts qpe = adjusted_quality / effort_factor if effort_factor > 0 else 0.0 + # qpe_absolute: raw quality for cross-project/temporal comparison + qpe_absolute = adjusted_quality + return QPEScore( qpe=qpe, + qpe_absolute=qpe_absolute, mi_normalized=mi_normalized, smell_penalty=smell_penalty, adjusted_quality=adjusted_quality, @@ -164,7 +182,6 @@ def compare( ) ) - # Sort by QPE (highest first) rankings = sorted(results, key=lambda x: x.qpe_score.qpe, reverse=True) return CrossProjectComparison( @@ -200,7 +217,6 @@ def compare_metrics( ) ) - # Sort by QPE (highest first) rankings = sorted(results, key=lambda x: x.qpe_score.qpe, reverse=True) return CrossProjectComparison( diff --git a/src/slopometry/summoner/services/user_story_service.py b/src/slopometry/summoner/services/user_story_service.py index fba59ed..2e052f6 100644 --- a/src/slopometry/summoner/services/user_story_service.py +++ b/src/slopometry/summoner/services/user_story_service.py @@ -4,13 +4,11 @@ from pathlib import Path import click -from rich.console import Console from slopometry.core.database import EventDatabase from slopometry.core.models import UserStoryDisplayData, UserStoryEntry, UserStoryStatistics from slopometry.core.settings import settings - -console = Console() +from slopometry.display.console import console class UserStoryService: diff --git a/tests/test_baseline_service.py b/tests/test_baseline_service.py index 2cb3278..5dae838 100644 --- a/tests/test_baseline_service.py +++ b/tests/test_baseline_service.py @@ -6,7 +6,7 @@ from conftest import make_test_metrics -from slopometry.core.models import ExtendedComplexityMetrics, HistoricalMetricStats, RepoBaseline +from slopometry.core.models import ExtendedComplexityMetrics, HistoricalMetricStats, QPEScore, RepoBaseline from slopometry.summoner.services.baseline_service import ( BaselineService, CommitInfo, @@ -167,6 +167,25 @@ def test_get_or_compute_baseline__returns_cached_when_head_unchanged(self, tmp_p current_metrics=ExtendedComplexityMetrics(**make_test_metrics(total_complexity=100)), oldest_commit_date=datetime.now(), newest_commit_date=datetime.now(), + qpe_stats=HistoricalMetricStats( + metric_name="qpe_delta", + mean=0.001, + std_dev=0.005, + median=0.001, + min_value=-0.01, + max_value=0.02, + sample_count=10, + trend_coefficient=0.0, + ), + current_qpe=QPEScore( + qpe=0.03, + qpe_absolute=0.45, + mi_normalized=0.5, + smell_penalty=0.1, + adjusted_quality=0.45, + effort_factor=15.0, + smell_counts={}, + ), ) mock_db.get_cached_baseline.return_value = cached_baseline diff --git a/tests/test_code_quality_cache.py b/tests/test_code_quality_cache.py index a2a61b4..c8ab95d 100644 --- a/tests/test_code_quality_cache.py +++ b/tests/test_code_quality_cache.py @@ -112,3 +112,30 @@ def test_working_tree_hash_support(self, db_connection): # Retrieve clean (None hash) -> Miss metrics, _ = manager.get_cached_metrics("sess_1", "/repo", "sha1", working_tree_hash=None) assert metrics is None + + def test_update_cached_coverage__updates_existing_entry(self, db_connection): + """Test that update_cached_coverage updates coverage fields in cached metrics.""" + manager = CodeQualityCacheManager(db_connection) + dummy_metrics = ExtendedComplexityMetrics(**make_test_metrics(total_complexity=50)) + + # Save initial metrics without coverage + manager.save_metrics_to_cache("sess_1", "/repo", "sha1", dummy_metrics) + + # Verify initial state has no coverage + metrics, _ = manager.get_cached_metrics("sess_1", "/repo", "sha1") + assert metrics.test_coverage_percent is None + + # Update with coverage + success = manager.update_cached_coverage("sess_1", 85.5, "coverage.xml") + assert success is True + + # Verify coverage was cached + metrics, _ = manager.get_cached_metrics("sess_1", "/repo", "sha1") + assert metrics.test_coverage_percent == 85.5 + assert metrics.test_coverage_source == "coverage.xml" + + def test_update_cached_coverage__returns_false_for_missing_session(self, db_connection): + """Test that update_cached_coverage returns False if session doesn't exist.""" + manager = CodeQualityCacheManager(db_connection) + success = manager.update_cached_coverage("nonexistent", 75.0, "coverage.xml") + assert success is False diff --git a/tests/test_current_impact_service.py b/tests/test_current_impact_service.py index 3ad97b7..73087fc 100644 --- a/tests/test_current_impact_service.py +++ b/tests/test_current_impact_service.py @@ -1,3 +1,4 @@ +import logging import subprocess from datetime import datetime from pathlib import Path @@ -5,7 +6,7 @@ import pytest from slopometry.core.complexity_analyzer import ComplexityAnalyzer -from slopometry.core.models import HistoricalMetricStats, RepoBaseline +from slopometry.core.models import AnalysisSource, HistoricalMetricStats, RepoBaseline from slopometry.summoner.services.current_impact_service import CurrentImpactService @@ -22,7 +23,6 @@ def real_baseline(self): analyzer = ComplexityAnalyzer(working_directory=source_repo) try: metrics = analyzer.analyze_extended_complexity() - # Create dummy stats for required fields dummy_stats = HistoricalMetricStats( metric_name="test_metric", mean=0.0, @@ -50,13 +50,11 @@ def real_baseline(self): @pytest.fixture def test_repo_path(self, tmp_path): """Create a temporary clone of the current repository.""" - # Use the actual current repo as source source_repo = Path.cwd() assert (source_repo / ".git").exists(), "Test must run from within the repository" dest_repo_path = tmp_path / "repo" - # Clone using git command to ensuring clean state subprocess.run(["git", "clone", str(source_repo), str(dest_repo_path)], check=True, capture_output=True) return dest_repo_path @@ -98,13 +96,153 @@ def test_analyze_uncommitted_changes__detects_changes(self, test_repo_path, real with open(target_file, "a") as f: f.write("\n\ndef complex_function(x):\n if x > 0:\n return x\n else:\n return -x\n") - # Analyze result = service.analyze_uncommitted_changes(test_repo_path, real_baseline) assert result is not None assert result.changed_files assert str(target_file.relative_to(test_repo_path)) in result.changed_files - # Should have some delta - # Since we modified the file, current complexity should differ from baseline - assert result.current_metrics.total_complexity != real_baseline.current_metrics.total_complexity + # Should have current metrics computed + # Note: We don't assert total_complexity differs because the baseline comes from + # the source repo's uncommitted state, which may coincidentally have the same + # total complexity as the clone with our appended function + assert result.current_metrics is not None + assert result.current_metrics.total_files_analyzed > 0 + assert result.current_metrics.total_complexity > 0 + + def test_analyze_previous_commit__returns_analysis_with_correct_source(self, test_repo_path, real_baseline): + """Test that analyzing previous commit sets the correct source.""" + assert real_baseline is not None, "Baseline computation failed" + + service = CurrentImpactService() + + # The test_repo_path is a clone with commits, so previous commit should exist + result = service.analyze_previous_commit(test_repo_path, real_baseline) + + # Should return analysis (assuming commits have Python changes) + if result is not None: + assert result.source == AnalysisSource.PREVIOUS_COMMIT + assert result.analyzed_commit_sha is not None + assert result.base_commit_sha is not None + assert len(result.analyzed_commit_sha) == 8 # Short SHA + assert len(result.base_commit_sha) == 8 # Short SHA + + def test_analyze_previous_commit__returns_none_when_no_previous_commit(self, tmp_path, real_baseline): + """Test that analyze_previous_commit returns None for repos with only one commit.""" + assert real_baseline is not None, "Baseline computation failed" + + # Create a repo with only one commit + repo_path = tmp_path / "single_commit_repo" + repo_path.mkdir() + + subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=repo_path, + check=True, + capture_output=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], + cwd=repo_path, + check=True, + capture_output=True, + ) + subprocess.run( + ["git", "config", "commit.gpgsign", "false"], + cwd=repo_path, + check=True, + capture_output=True, + ) + + test_file = repo_path / "test.py" + test_file.write_text("def foo(): pass\n") + subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-m", "Initial"], + cwd=repo_path, + check=True, + capture_output=True, + ) + + service = CurrentImpactService() + result = service.analyze_previous_commit(repo_path, real_baseline) + + assert result is None + + def test_analyze_previous_commit__returns_none_when_no_python_changes(self, tmp_path, real_baseline): + """Test that analyze_previous_commit returns None when last commit has no Python changes.""" + assert real_baseline is not None, "Baseline computation failed" + + # Create a repo with two commits, but the last one has no Python changes + repo_path = tmp_path / "no_python_changes_repo" + repo_path.mkdir() + + subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=repo_path, + check=True, + capture_output=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], + cwd=repo_path, + check=True, + capture_output=True, + ) + subprocess.run( + ["git", "config", "commit.gpgsign", "false"], + cwd=repo_path, + check=True, + capture_output=True, + ) + + # First commit with Python file + test_file = repo_path / "test.py" + test_file.write_text("def foo(): pass\n") + subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-m", "Initial"], + cwd=repo_path, + check=True, + capture_output=True, + ) + + # Second commit with only a text file (no Python) + txt_file = repo_path / "readme.txt" + txt_file.write_text("Just a text file\n") + subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-m", "Add readme"], + cwd=repo_path, + check=True, + capture_output=True, + ) + + service = CurrentImpactService() + result = service.analyze_previous_commit(repo_path, real_baseline) + + # Should return None because no Python files were changed + assert result is None + + def test_analyze_previous_commit__logs_debug_on_git_operation_error( + self, test_repo_path, real_baseline, caplog, monkeypatch + ): + """Test that analyze_previous_commit logs debug messages on GitOperationError.""" + assert real_baseline is not None, "Baseline computation failed" + + from slopometry.core.git_tracker import GitOperationError, GitTracker + + def mock_get_changed_python_files(self, parent_sha, child_sha): + raise GitOperationError("Simulated git diff failure") + + monkeypatch.setattr(GitTracker, "get_changed_python_files", mock_get_changed_python_files) + + service = CurrentImpactService() + + with caplog.at_level(logging.DEBUG, logger="slopometry.summoner.services.current_impact_service"): + result = service.analyze_previous_commit(test_repo_path, real_baseline) + + assert result is None + assert any("Failed to get changed files" in record.message for record in caplog.records) diff --git a/tests/test_experiment_orchestrator.py b/tests/test_experiment_orchestrator.py index 1966d41..0eea4db 100644 --- a/tests/test_experiment_orchestrator.py +++ b/tests/test_experiment_orchestrator.py @@ -147,10 +147,8 @@ def test_analyze_commit_chain__skips_merge_commits(self, tmp_path: Path): patch("slopometry.summoner.services.experiment_orchestrator.GitTracker") as MockGit, patch("slopometry.summoner.services.experiment_orchestrator.CLICalculator"), patch("slopometry.summoner.services.experiment_orchestrator.subprocess.run") as mock_subprocess, - patch("slopometry.summoner.services.experiment_orchestrator.Console") as MockConsole, + patch("slopometry.summoner.services.experiment_orchestrator.console") as mock_console, ): - mock_console = MockConsole.return_value - # First call: git rev-list returns commits # Second+ calls: git rev-list --parents returns parent info def subprocess_side_effect(*args, **kwargs): @@ -207,10 +205,9 @@ def test_analyze_commit_chain__handles_empty_commit_range(self, tmp_path: Path): patch("slopometry.summoner.services.experiment_orchestrator.GitTracker") as MockGit, patch("slopometry.summoner.services.experiment_orchestrator.CLICalculator"), patch("slopometry.summoner.services.experiment_orchestrator.subprocess.run") as mock_subprocess, - patch("slopometry.summoner.services.experiment_orchestrator.Console"), + patch("slopometry.summoner.services.experiment_orchestrator.console"), patch("slopometry.summoner.services.experiment_orchestrator.ComplexityAnalyzer"), ): - def subprocess_side_effect(*args, **kwargs): result = MagicMock() result.returncode = 0 @@ -254,10 +251,8 @@ def test_analyze_commit_chain__handles_git_operation_error(self, tmp_path: Path) patch("slopometry.summoner.services.experiment_orchestrator.GitTracker") as MockGit, patch("slopometry.summoner.services.experiment_orchestrator.CLICalculator"), patch("slopometry.summoner.services.experiment_orchestrator.subprocess.run") as mock_subprocess, - patch("slopometry.summoner.services.experiment_orchestrator.Console") as MockConsole, + patch("slopometry.summoner.services.experiment_orchestrator.console") as mock_console, ): - mock_console = MockConsole.return_value - # Rev-list returns one commit def subprocess_side_effect(*args, **kwargs): result = MagicMock() diff --git a/tests/test_feedback_cache.py b/tests/test_feedback_cache.py index 40d6908..c63a9f5 100644 --- a/tests/test_feedback_cache.py +++ b/tests/test_feedback_cache.py @@ -7,8 +7,6 @@ import time from pathlib import Path -import pytest - from slopometry.core.hook_handler import ( _compute_feedback_cache_key, _get_feedback_cache_path, @@ -336,10 +334,6 @@ def test_get_modified_python_files__empty_when_no_changes(self): class TestSubmoduleHandling: """Tests for git submodule handling.""" - @pytest.mark.skipif( - subprocess.run(["git", "--version"], capture_output=True).returncode != 0, - reason="Git not available", - ) def test_feedback_cache__submodule_changes_dont_invalidate(self): """Verify submodule changes don't cause cache misses. diff --git a/tests/test_project_guard.py b/tests/test_project_guard.py new file mode 100644 index 0000000..ef42d34 --- /dev/null +++ b/tests/test_project_guard.py @@ -0,0 +1,128 @@ +"""Tests for project_guard.py.""" + +from pathlib import Path + +import pytest + +from slopometry.core.project_guard import ( + MultiProjectError, + detect_multi_project_directory, + guard_single_project, +) + + +class TestDetectMultiProjectDirectory: + """Tests for detect_multi_project_directory.""" + + def test_detect_multi_project_directory__single_project(self, tmp_path: Path) -> None: + """Single git repo returns one project.""" + (tmp_path / ".git").mkdir() + projects = detect_multi_project_directory(tmp_path) + assert projects == ["."] + + def test_detect_multi_project_directory__multiple_projects(self, tmp_path: Path) -> None: + """Multiple git repos in subdirs returns all.""" + proj1 = tmp_path / "project1" + proj2 = tmp_path / "project2" + proj1.mkdir() + proj2.mkdir() + (proj1 / ".git").mkdir() + (proj2 / ".git").mkdir() + + projects = detect_multi_project_directory(tmp_path) + assert sorted(projects) == ["project1", "project2"] + + def test_detect_multi_project_directory__respects_max_depth(self, tmp_path: Path) -> None: + """Projects beyond max_depth are not found.""" + deep = tmp_path / "level1" / "level2" / "level3" + deep.mkdir(parents=True) + (deep / ".git").mkdir() + + projects_shallow = detect_multi_project_directory(tmp_path, max_depth=2) + assert projects_shallow == [] + + projects_deep = detect_multi_project_directory(tmp_path, max_depth=3) + assert projects_deep == ["level1/level2/level3"] + + def test_detect_multi_project_directory__ignores_hidden_dirs(self, tmp_path: Path) -> None: + """Hidden directories are not searched.""" + hidden = tmp_path / ".hidden_project" + hidden.mkdir() + (hidden / ".git").mkdir() + + projects = detect_multi_project_directory(tmp_path) + assert projects == [] + + def test_detect_multi_project_directory__skips_submodules(self, tmp_path: Path) -> None: + """Git submodules are not counted as separate projects.""" + (tmp_path / ".git").mkdir() + submodule = tmp_path / "vendor" / "lib" + submodule.mkdir(parents=True) + (submodule / ".git").write_text("gitdir: ../../.git/modules/vendor/lib") + + projects = detect_multi_project_directory(tmp_path) + assert projects == ["."] + + def test_detect_multi_project_directory__nested_single_project(self, tmp_path: Path) -> None: + """Nested dirs with single git repo.""" + proj = tmp_path / "code" / "myproject" + proj.mkdir(parents=True) + (proj / ".git").mkdir() + + projects = detect_multi_project_directory(tmp_path) + assert projects == ["code/myproject"] + + def test_detect_multi_project_directory__no_git_repos(self, tmp_path: Path) -> None: + """No git repos returns empty list.""" + (tmp_path / "somefile.txt").write_text("hello") + projects = detect_multi_project_directory(tmp_path) + assert projects == [] + + +class TestGuardSingleProject: + """Tests for guard_single_project.""" + + def test_guard_single_project__passes_for_single_project(self, tmp_path: Path) -> None: + """Single project doesn't raise.""" + (tmp_path / ".git").mkdir() + guard_single_project(tmp_path) + + def test_guard_single_project__raises_for_multiple_projects(self, tmp_path: Path) -> None: + """Multiple projects raises MultiProjectError.""" + proj1 = tmp_path / "project1" + proj2 = tmp_path / "project2" + proj1.mkdir() + proj2.mkdir() + (proj1 / ".git").mkdir() + (proj2 / ".git").mkdir() + + with pytest.raises(MultiProjectError) as exc_info: + guard_single_project(tmp_path) + + assert exc_info.value.project_count == 2 + assert "project1" in exc_info.value.projects + assert "project2" in exc_info.value.projects + + def test_guard_single_project__raises_for_no_git_repo(self, tmp_path: Path) -> None: + """No git repo raises MultiProjectError with count 0.""" + with pytest.raises(MultiProjectError) as exc_info: + guard_single_project(tmp_path) + + assert exc_info.value.project_count == 0 + + +class TestMultiProjectError: + """Tests for MultiProjectError.""" + + def test_multi_project_error__message_format(self) -> None: + """Error message includes count and project names.""" + error = MultiProjectError(3, ["proj1", "proj2", "proj3"]) + assert "3 git repositories" in str(error) + assert "proj1" in str(error) + + def test_multi_project_error__truncates_long_lists(self) -> None: + """Long project lists are truncated in message.""" + projects = [f"proj{i}" for i in range(10)] + error = MultiProjectError(10, projects) + assert "..." in str(error) + assert "proj5" not in str(error) diff --git a/tests/test_qpe_calculator.py b/tests/test_qpe_calculator.py index 23cff5b..70d5c6e 100644 --- a/tests/test_qpe_calculator.py +++ b/tests/test_qpe_calculator.py @@ -81,8 +81,8 @@ def test_calculate_qpe__smell_penalty_reduces_adjusted_quality(self): expected_adjusted = qpe_score.mi_normalized * (1 - qpe_score.smell_penalty) assert abs(qpe_score.adjusted_quality - expected_adjusted) < 0.001 - def test_calculate_qpe__smell_penalty_capped_at_0_5(self): - """Test that smell penalty is capped at 0.5 even with many smells.""" + def test_calculate_qpe__smell_penalty_saturates_with_sigmoid(self): + """Test that smell penalty uses sigmoid saturation (approaches 0.9 asymptotically).""" calculator = QPECalculator() metrics = ExtendedComplexityMetrics( @@ -102,7 +102,58 @@ def test_calculate_qpe__smell_penalty_capped_at_0_5(self): qpe_score = calculator.calculate_qpe(metrics) - assert qpe_score.smell_penalty <= 0.5 + # Sigmoid approaches 0.9 asymptotically but never exceeds it + assert qpe_score.smell_penalty <= 0.9 + # With many smells, penalty should be high (close to saturation) + assert qpe_score.smell_penalty > 0.5 + + def test_calculate_qpe__spreading_smells_does_not_reduce_penalty(self): + """Test that spreading smells across files doesn't reduce penalty (anti-gaming fix).""" + calculator = QPECalculator() + + # Same smells, 1 file + metrics_concentrated = ExtendedComplexityMetrics( + **make_test_metrics( + total_effort=50000.0, + average_mi=75.0, + total_files_analyzed=10, # 10 total files + hasattr_getattr_count=10, + hasattr_getattr_files=["file1.py"], # All in 1 file + ) + ) + + # Same smells, 10 files + metrics_spread = ExtendedComplexityMetrics( + **make_test_metrics( + total_effort=50000.0, + average_mi=75.0, + total_files_analyzed=10, # 10 total files + hasattr_getattr_count=10, + hasattr_getattr_files=[f"file{i}.py" for i in range(10)], # Spread across 10 files + ) + ) + + qpe_concentrated = calculator.calculate_qpe(metrics_concentrated) + qpe_spread = calculator.calculate_qpe(metrics_spread) + + # Both should have the same smell penalty (normalizing by total files, not affected) + assert abs(qpe_concentrated.smell_penalty - qpe_spread.smell_penalty) < 0.001 + + def test_calculate_qpe__qpe_absolute_equals_adjusted_quality(self): + """Test that qpe_absolute equals adjusted_quality (no effort normalization).""" + calculator = QPECalculator() + + metrics = ExtendedComplexityMetrics( + **make_test_metrics( + total_effort=50000.0, + average_mi=75.0, + total_files_analyzed=10, + ) + ) + + qpe_score = calculator.calculate_qpe(metrics) + + assert qpe_score.qpe_absolute == qpe_score.adjusted_quality def test_calculate_qpe__effort_factor_uses_log_scale(self): """Test that effort factor uses log scale for diminishing returns.""" @@ -151,6 +202,7 @@ def test_grpo_advantage__returns_positive_when_candidate_is_better(self): """Test that advantage is positive when candidate has higher QPE.""" baseline = QPEScore( qpe=0.05, + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -159,6 +211,7 @@ def test_grpo_advantage__returns_positive_when_candidate_is_better(self): candidate = QPEScore( qpe=0.07, # Higher QPE + qpe_absolute=0.76, mi_normalized=0.8, smell_penalty=0.05, adjusted_quality=0.76, @@ -173,6 +226,7 @@ def test_grpo_advantage__returns_negative_when_candidate_is_worse(self): """Test that advantage is negative when candidate has lower QPE.""" baseline = QPEScore( qpe=0.07, + qpe_absolute=0.76, mi_normalized=0.8, smell_penalty=0.05, adjusted_quality=0.76, @@ -181,6 +235,7 @@ def test_grpo_advantage__returns_negative_when_candidate_is_worse(self): candidate = QPEScore( qpe=0.05, # Lower QPE + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -195,6 +250,7 @@ def test_grpo_advantage__returns_zero_when_qpe_matches(self): """Test that advantage is zero when QPE scores are equal.""" baseline = QPEScore( qpe=0.05, + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -203,6 +259,7 @@ def test_grpo_advantage__returns_zero_when_qpe_matches(self): candidate = QPEScore( qpe=0.05, # Same QPE + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -218,6 +275,7 @@ def test_grpo_advantage__bounded_between_minus_1_and_1(self): # Extreme improvement case baseline = QPEScore( qpe=0.01, + qpe_absolute=0.35, mi_normalized=0.5, smell_penalty=0.3, adjusted_quality=0.35, @@ -226,6 +284,7 @@ def test_grpo_advantage__bounded_between_minus_1_and_1(self): candidate = QPEScore( qpe=1.0, # 100x improvement + qpe_absolute=1.0, mi_normalized=1.0, smell_penalty=0.0, adjusted_quality=1.0, @@ -240,6 +299,7 @@ def test_grpo_advantage__bounded_between_minus_1_and_1(self): # Extreme degradation case worse_candidate = QPEScore( qpe=0.0001, # Much worse + qpe_absolute=0.05, mi_normalized=0.1, smell_penalty=0.5, adjusted_quality=0.05, @@ -254,6 +314,7 @@ def test_grpo_advantage__handles_zero_baseline(self): """Test that advantage handles zero baseline QPE gracefully.""" baseline = QPEScore( qpe=0.0, # Zero baseline + qpe_absolute=0.0, mi_normalized=0.0, smell_penalty=0.5, adjusted_quality=0.0, @@ -262,6 +323,7 @@ def test_grpo_advantage__handles_zero_baseline(self): candidate = QPEScore( qpe=0.05, + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -361,7 +423,8 @@ def test_qpe_cli_command__runs_without_error(self, repo_path: Path) -> None: assert result.returncode == 0, f"qpe command failed with: {result.stderr}" assert "Quality-Per-Effort Score" in result.stdout - assert "QPE:" in result.stdout + assert "QPE (GRPO):" in result.stdout + assert "Quality:" in result.stdout def test_qpe_cli_command__json_output_is_valid(self, repo_path: Path) -> None: """Test that --json flag produces valid JSON output.""" @@ -379,6 +442,7 @@ def test_qpe_cli_command__json_output_is_valid(self, repo_path: Path) -> None: qpe_data = json.loads(result.stdout) assert "qpe" in qpe_data + assert "qpe_absolute" in qpe_data assert "mi_normalized" in qpe_data assert "smell_penalty" in qpe_data assert "adjusted_quality" in qpe_data @@ -386,7 +450,9 @@ def test_qpe_cli_command__json_output_is_valid(self, repo_path: Path) -> None: assert "smell_counts" in qpe_data assert isinstance(qpe_data["qpe"], float) + assert isinstance(qpe_data["qpe_absolute"], float) assert qpe_data["qpe"] > 0 + assert qpe_data["qpe_absolute"] > 0 def test_qpe_calculator__real_codebase_produces_consistent_results(self, repo_path: Path) -> None: """Test QPE calculation on real codebase produces stable, sensible values.""" @@ -404,12 +470,16 @@ def test_qpe_calculator__real_codebase_produces_consistent_results(self, repo_pa # MI normalized should be in valid range (0-1) assert 0 <= qpe_score.mi_normalized <= 1 - # Smell penalty should be capped at 0.5 - assert 0 <= qpe_score.smell_penalty <= 0.5 + # Smell penalty uses sigmoid (saturates at 0.9) + assert 0 <= qpe_score.smell_penalty <= 0.9 - # Adjusted quality should be MI * (1 - smell_penalty) - expected_adjusted = qpe_score.mi_normalized * (1 - qpe_score.smell_penalty) - assert abs(qpe_score.adjusted_quality - expected_adjusted) < 0.001 + # Adjusted quality should be MI * (1 - smell_penalty) + bonuses + # The exact bonuses depend on coverage thresholds, so we verify the base formula holds + # and that any difference is explained by bonuses (in [0, 0.12] range: 0.05+0.05+0.02) + base_quality = qpe_score.mi_normalized * (1 - qpe_score.smell_penalty) + bonus_applied = qpe_score.adjusted_quality - base_quality + assert bonus_applied >= 0, "Bonuses should be non-negative" + assert bonus_applied <= 0.12 + 0.001, "Bonuses should not exceed max possible (0.05+0.05+0.02)" # Effort factor should be log(effort + 1) expected_effort_factor = math.log(metrics.total_effort + 1) @@ -439,10 +509,11 @@ def test_display_qpe_score__renders_without_error(self, repo_path: Path) -> None # This should not raise AttributeError: 'QPEScore' object has no attribute 'effort_tier' display_qpe_score(qpe_score, metrics) - def test_qpe_score_model__serializes_to_json_without_effort_tier(self) -> None: - """Test that QPEScore model serializes correctly without effort_tier field.""" + def test_qpe_score_model__serializes_to_json_with_both_qpe_metrics(self) -> None: + """Test that QPEScore model serializes correctly with both qpe and qpe_absolute.""" qpe_score = QPEScore( qpe=0.05, + qpe_absolute=0.63, mi_normalized=0.7, smell_penalty=0.1, adjusted_quality=0.63, @@ -453,11 +524,13 @@ def test_qpe_score_model__serializes_to_json_without_effort_tier(self) -> None: json_output = qpe_score.model_dump_json() assert "qpe" in json_output + assert "qpe_absolute" in json_output assert "effort_tier" not in json_output # Verify round-trip restored = QPEScore.model_validate_json(json_output) assert restored.qpe == 0.05 + assert restored.qpe_absolute == 0.63 assert restored.smell_counts["hasattr_getattr"] == 5 def test_qpe_calculator__handles_empty_codebase_gracefully(self, tmp_path: Path) -> None: diff --git a/tests/test_smell_registry.py b/tests/test_smell_registry.py new file mode 100644 index 0000000..9a1eacd --- /dev/null +++ b/tests/test_smell_registry.py @@ -0,0 +1,232 @@ +"""Tests for the smell registry and SmellData model.""" + +import pytest + +from slopometry.core.models import ( + SMELL_REGISTRY, + ComplexityDelta, + ExtendedComplexityMetrics, + SmellCategory, + SmellData, + get_smell_label, + get_smells_by_category, +) + + +class TestSmellRegistry: + """Tests for SMELL_REGISTRY completeness and consistency.""" + + def test_smell_registry__has_all_13_smells(self) -> None: + """Verify all expected smells are in the registry.""" + expected_smells = { + "orphan_comment", + "untracked_todo", + "swallowed_exception", + "test_skip", + "type_ignore", + "dynamic_execution", + "inline_import", + "dict_get_with_default", + "hasattr_getattr", + "nonempty_init", + # Abstraction smells (QPE v2) + "single_method_class", + "deep_inheritance", + "passthrough_wrapper", + } + assert set(SMELL_REGISTRY.keys()) == expected_smells + + def test_smell_registry__all_definitions_have_required_fields(self) -> None: + """Verify all smell definitions have required fields populated.""" + for name, defn in SMELL_REGISTRY.items(): + assert defn.internal_name == name, f"{name}: internal_name mismatch" + assert defn.label, f"{name}: missing label" + assert defn.category in SmellCategory, f"{name}: invalid category" + assert 0 < defn.weight <= 1.0, f"{name}: weight {defn.weight} out of range" + assert defn.guidance, f"{name}: missing guidance" + assert defn.count_field.endswith("_count"), f"{name}: invalid count_field" + assert defn.files_field.endswith("_files"), f"{name}: invalid files_field" + + def test_smell_registry__general_category_smells(self) -> None: + """Verify expected smells are categorized as GENERAL.""" + general_smells = { + "orphan_comment", + "untracked_todo", + "swallowed_exception", + "test_skip", + "type_ignore", + "dynamic_execution", + } + for name in general_smells: + assert SMELL_REGISTRY[name].category == SmellCategory.GENERAL + + def test_smell_registry__python_category_smells(self) -> None: + """Verify expected smells are categorized as PYTHON.""" + python_smells = { + "inline_import", + "dict_get_with_default", + "hasattr_getattr", + "nonempty_init", + # Abstraction smells (QPE v2) + "single_method_class", + "deep_inheritance", + "passthrough_wrapper", + } + for name in python_smells: + assert SMELL_REGISTRY[name].category == SmellCategory.PYTHON + + +class TestSmellHelpers: + """Tests for smell helper functions.""" + + def test_get_smell_label__returns_registry_label(self) -> None: + """Verify get_smell_label returns the registry label.""" + assert get_smell_label("orphan_comment") == "Orphan Comments" + assert get_smell_label("swallowed_exception") == "Swallowed Exceptions" + + def test_get_smell_label__handles_unknown_smell(self) -> None: + """Verify get_smell_label handles unknown smells gracefully.""" + assert get_smell_label("unknown_smell") == "Unknown Smell" + + def test_get_smells_by_category__returns_general_smells(self) -> None: + """Verify get_smells_by_category returns all GENERAL smells.""" + general = get_smells_by_category(SmellCategory.GENERAL) + assert len(general) == 6 + assert all(d.category == SmellCategory.GENERAL for d in general) + + def test_get_smells_by_category__returns_python_smells(self) -> None: + """Verify get_smells_by_category returns all PYTHON smells.""" + python = get_smells_by_category(SmellCategory.PYTHON) + assert len(python) == 7 # 4 original + 3 abstraction smells + assert all(d.category == SmellCategory.PYTHON for d in python) + + def test_get_smells_by_category__sorted_by_weight_descending(self) -> None: + """Verify get_smells_by_category returns smells sorted by weight (highest first).""" + general = get_smells_by_category(SmellCategory.GENERAL) + weights = [d.weight for d in general] + assert weights == sorted(weights, reverse=True) + + +class TestSmellData: + """Tests for SmellData model.""" + + def test_smell_data__provides_definition_via_property(self) -> None: + """Verify SmellData.definition returns the registry definition.""" + smell = SmellData(name="orphan_comment", count=5, files=["a.py", "b.py"]) + assert smell.definition == SMELL_REGISTRY["orphan_comment"] + + def test_smell_data__provides_label_via_property(self) -> None: + """Verify SmellData.label returns the display label.""" + smell = SmellData(name="swallowed_exception", count=3, files=["a.py"]) + assert smell.label == "Swallowed Exceptions" + + def test_smell_data__provides_category_via_property(self) -> None: + """Verify SmellData.category returns the smell category.""" + smell = SmellData(name="inline_import", count=10, files=["a.py"]) + assert smell.category == SmellCategory.PYTHON + + def test_smell_data__provides_weight_via_property(self) -> None: + """Verify SmellData.weight returns the smell weight.""" + smell = SmellData(name="test_skip", count=2, files=["test.py"]) + assert smell.weight == 0.10 + + def test_smell_data__is_frozen(self) -> None: + """Verify SmellData is immutable.""" + smell = SmellData(name="orphan_comment", count=5, files=["a.py"]) + with pytest.raises(Exception): # ValidationError in Pydantic + smell.count = 10 # type: ignore[misc] + + +class TestExtendedComplexityMetricsSmellMethods: + """Tests for smell-related methods on ExtendedComplexityMetrics.""" + + @pytest.fixture + def metrics_with_smells(self) -> ExtendedComplexityMetrics: + """Create metrics with various smells.""" + return ExtendedComplexityMetrics( + total_files_analyzed=10, + total_complexity=50, + average_complexity=5.0, + max_complexity=15, + min_complexity=1, + total_tokens=5000, + average_tokens=500.0, + max_tokens=1000, + min_tokens=100, + total_volume=10000.0, + total_effort=50000.0, + total_difficulty=5.0, + average_volume=1000.0, + average_effort=5000.0, + average_difficulty=0.5, + total_mi=850.0, + average_mi=85.0, + orphan_comment_count=3, + orphan_comment_files=["a.py", "b.py"], + swallowed_exception_count=1, + swallowed_exception_files=["error_handler.py"], + test_skip_count=0, + test_skip_files=[], + ) + + def test_get_smells__returns_all_smell_data(self, metrics_with_smells: ExtendedComplexityMetrics) -> None: + """Verify get_smells returns SmellData for all smells.""" + smells = metrics_with_smells.get_smells() + assert len(smells) == 13 # 10 original + 3 abstraction smells + assert all(isinstance(s, SmellData) for s in smells) + + def test_get_smells__includes_correct_counts(self, metrics_with_smells: ExtendedComplexityMetrics) -> None: + """Verify get_smells returns correct counts.""" + smells = {s.name: s for s in metrics_with_smells.get_smells()} + assert smells["orphan_comment"].count == 3 + assert smells["swallowed_exception"].count == 1 + assert smells["test_skip"].count == 0 + + def test_get_smells__includes_correct_files(self, metrics_with_smells: ExtendedComplexityMetrics) -> None: + """Verify get_smells returns correct file lists.""" + smells = {s.name: s for s in metrics_with_smells.get_smells()} + assert smells["orphan_comment"].files == ["a.py", "b.py"] + assert smells["swallowed_exception"].files == ["error_handler.py"] + assert smells["test_skip"].files == [] + + def test_get_smell_files__returns_name_to_files_mapping( + self, metrics_with_smells: ExtendedComplexityMetrics + ) -> None: + """Verify get_smell_files returns dict mapping smell names to files.""" + smell_files = metrics_with_smells.get_smell_files() + assert smell_files["orphan_comment"] == ["a.py", "b.py"] + assert smell_files["swallowed_exception"] == ["error_handler.py"] + assert smell_files["test_skip"] == [] + + def test_get_smell_counts__returns_name_to_count_mapping( + self, metrics_with_smells: ExtendedComplexityMetrics + ) -> None: + """Verify get_smell_counts returns dict mapping smell names to counts.""" + smell_counts = metrics_with_smells.get_smell_counts() + assert len(smell_counts) == 13 # 10 original + 3 abstraction smells + assert smell_counts["orphan_comment"] == 3 + assert smell_counts["swallowed_exception"] == 1 + assert smell_counts["test_skip"] == 0 + + +class TestComplexityDeltaSmellChanges: + """Tests for smell-related methods on ComplexityDelta.""" + + def test_get_smell_changes__returns_all_smell_changes(self) -> None: + """Verify get_smell_changes returns all smell change values.""" + delta = ComplexityDelta( + orphan_comment_change=2, + swallowed_exception_change=-1, + test_skip_change=0, + ) + changes = delta.get_smell_changes() + assert len(changes) == 13 # 10 original + 3 abstraction smells + assert changes["orphan_comment"] == 2 + assert changes["swallowed_exception"] == -1 + assert changes["test_skip"] == 0 + + def test_get_smell_changes__default_values_are_zero(self) -> None: + """Verify get_smell_changes returns 0 for unset changes.""" + delta = ComplexityDelta() + changes = delta.get_smell_changes() + assert all(v == 0 for v in changes.values()) diff --git a/uv.lock b/uv.lock index 4461daf..d939608 100644 --- a/uv.lock +++ b/uv.lock @@ -2750,7 +2750,7 @@ wheels = [ [[package]] name = "slopometry" -version = "20260113.post1" +version = "20260117.post1" source = { editable = "." } dependencies = [ { name = "click" },