Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 172 additions & 50 deletions src/pyspector/reporting.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,90 @@
import json
import html as html_module
# Added 'Region' to imports for better SARIF compliance
from sarif_om import SarifLog, Tool, Run, ReportingDescriptor, Result, ArtifactLocation, Location, PhysicalLocation, Region
# Removed 'asdict' from imports as it is not needed for sarif_om
from dataclasses import asdict, is_dataclass
from sarif_om import (
SarifLog,
Tool,
ToolComponent,
Run,
ReportingDescriptor,
MultiformatMessageString,
Result,
ArtifactLocation,
Location,
PhysicalLocation,
Region,
Message,
)

# Maps internal severity levels to SARIF-compliant level strings.
_SEVERITY_TO_SARIF_LEVEL = {
"CRITICAL": "error",
"HIGH": "error",
"MEDIUM": "warning",
"LOW": "note",
}

_PYSPECTOR_VERSION = "1.0.0"

def _clean(obj):
"""
Recursively serialize a sarif_om object to a plain dict,
dropping any key whose value is None so the output stays lean.
sarif_om objects expose their data via __dict__; we walk that
structure and strip falsy-None leaves.
"""
if isinstance(obj, list):
return [_clean(item) for item in obj]
if hasattr(obj, "__dict__"):
return {
k: _clean(v)
for k, v in obj.__dict__.items()
if v is not None
}
return obj


class Reporter:
def __init__(self, issues: list, report_format: str):
self.issues = issues
self.format = report_format

def generate(self) -> str:
if self.format == 'json':
if self.format == "json":
return self.to_json()
if self.format == 'sarif':
if self.format == "sarif":
return self.to_sarif()
if self.format == 'html':
if self.format == "html":
return self.to_html()
return self.to_console()

# ------------------------------------------------------------------ #
# Console #
# ------------------------------------------------------------------ #

def to_console(self) -> str:
if not self.issues:
return "\nNo issues found."

output = []
severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]

# Define severity order (highest to lowest priority)
severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']

# Group issues by severity
issues_by_severity = {}
issues_by_severity: dict[str, list] = {}
for issue in self.issues:
severity = str(issue.severity).split('.')[-1].upper()
if severity not in issues_by_severity:
issues_by_severity[severity] = []
issues_by_severity[severity].append(issue)
severity = str(issue.severity).split(".")[-1].upper()
issues_by_severity.setdefault(severity, []).append(issue)

# Output grouped by severity (in priority order)
for severity in severity_order:
if severity not in issues_by_severity:
continue

issues = issues_by_severity[severity]
# Sort issues within each severity group by file path and line number
sorted_issues = sorted(issues, key=lambda i: (i.file_path, i.line_number))

# Add severity header
sorted_issues = sorted(
issues_by_severity[severity],
key=lambda i: (i.file_path, i.line_number),
)
output.append(f"\n{'='*60}")
output.append(f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})")
output.append(
f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})"
)
output.append(f"{'='*60}")

for issue in sorted_issues:
Expand All @@ -60,6 +97,10 @@ def to_console(self) -> str:

return "\n".join(output)

# ------------------------------------------------------------------ #
# JSON #
# ------------------------------------------------------------------ #

def to_json(self) -> str:
report = {
"summary": {"issue_count": len(self.issues)},
Expand All @@ -70,47 +111,128 @@ def to_json(self) -> str:
"file_path": issue.file_path,
"line_number": issue.line_number,
"code": issue.code,
"severity": str(issue.severity).split('.')[-1],
"severity": str(issue.severity).split(".")[-1],
"remediation": issue.remediation,
} for issue in self.issues
]
}
for issue in self.issues
],
}
return json.dumps(report, indent=2)

# ------------------------------------------------------------------ #
# SARIF #
# ------------------------------------------------------------------ #

def to_sarif(self) -> str:
tool = Tool(driver=ReportingDescriptor(id="pyspector", name="PySpector"))
rules = []
results = []

# Create a unique list of rules for the SARIF report
rule_map = {}
"""
Produces a SARIF 2.1.0 document.

Improvements over the previous implementation:
- Uses ToolComponent (correct type for Tool.driver).
- Builds a deduplicated, ordered rule list and references rules by
index in each Result (rule_index), which is required for tooling
that doesn't index rules by ID alone.
- Maps internal severity levels to the SARIF `level` field
(error / warning / note) so consumers can filter by severity
without understanding PySpector-specific values.
- Surfaces remediation guidance in rule.help so it appears in
IDEs and dashboards that consume SARIF.
- Uses proper Message / MultiformatMessageString objects instead
of raw dicts.
- Serialises via a custom _clean() helper that drops None-valued
keys, keeping the output compact and spec-compliant.
"""

# ── 1. Build an ordered, deduplicated rule list ──────────────────
rule_index_map: dict[str, int] = {}
rules: list[ReportingDescriptor] = []

for issue in self.issues:
if issue.rule_id not in rule_map:
rule_map[issue.rule_id] = ReportingDescriptor(id=issue.rule_id, name=issue.description)

# sarif_om expects lists, not values view
tool.driver.rules = list(rule_map.values())
if issue.rule_id in rule_index_map:
continue

severity_key = str(issue.severity).split(".")[-1].upper()

rule = ReportingDescriptor(
id=issue.rule_id,
name=issue.rule_id, # human-friendly CamelCase id is conventional
short_description=MultiformatMessageString(
text=issue.description
),
# help surfaces remediation in GitHub Advanced Security, VS Code, etc.
help=MultiformatMessageString(
text=issue.remediation or issue.description,
markdown=(
f"**Remediation:** {issue.remediation}"
if issue.remediation
else None
),
),
# default_configuration carries the base severity level for the rule
default_configuration={"level": _SEVERITY_TO_SARIF_LEVEL.get(severity_key, "warning")},
)

rule_index_map[issue.rule_id] = len(rules)
rules.append(rule)

# ── 2. Assemble the Tool ─────────────────────────────────────────
driver = ToolComponent(
name="PySpector",
version=_PYSPECTOR_VERSION,
information_uri="https://github.com/your-org/pyspector",
rules=rules,
)
tool = Tool(driver=driver)

# ── 3. Build Results ─────────────────────────────────────────────
results: list[Result] = []

for issue in self.issues:
# FIX: Use the Region object from sarif_om instead of a raw dict
region = Region(start_line=issue.line_number)

severity_key = str(issue.severity).split(".")[-1].upper()
level = _SEVERITY_TO_SARIF_LEVEL.get(severity_key, "warning")

region = Region(
start_line=issue.line_number,
# Snippet lets viewers show the offending code inline
snippet=MultiformatMessageString(text=issue.code.strip()),
)

location = Location(
physical_location=PhysicalLocation(
artifact_location=ArtifactLocation(uri=issue.file_path),
region=region
artifact_location=ArtifactLocation(
uri=issue.file_path,
# uri_base_id makes paths relative to the repo root,
uri_base_id="%SRCROOT%",
),
region=region,
)
)
results.append(Result(rule_id=issue.rule_id, message={"text": issue.description}, locations=[location]))


result = Result(
rule_id=issue.rule_id,
rule_index=rule_index_map[issue.rule_id],
level=level,
message=Message(text=issue.description),
locations=[location],
)

results.append(result)

# ── 4. Compose the log ───────────────────────────────────────────
run = Run(tool=tool, results=results)
log = SarifLog(version="2.1.0", schema_uri="https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json", runs=[run])

# FIX: Remove asdict(). Use default lambda to serialize non-dataclass objects.
return json.dumps(log, default=lambda o: o.__dict__, indent=2)

log = SarifLog(
version="2.1.0",
schema_uri=(
"https://raw.githubusercontent.com/oasis-tcs/sarif-spec/"
"master/Schemata/sarif-schema-2.1.0.json"
),
runs=[run],
)

# ── 5. Serialise, stripping None values ──────────────────────────
return json.dumps(_clean(log), indent=2)

def to_html(self) -> str:
# A simple HTML report
html = f"""
<html>
<head><title>PySpector Scan Report</title></head>
Expand Down
Loading