Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions checker/plugins/checker_reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Secure pytest reporter plugin for checker.
Uses pipe IPC to send results instead of temporary files.

Security model:
- Pipe-based IPC prevents students from reading report data
- Python -I flag (in python.py) blocks sitecustomize.py
- Security relies on: 1) pipe IPC, 2) python -I flag
"""

from __future__ import annotations

import json
import time
from typing import Any

import pytest


class CheckerReporterPlugin:
"""
Pytest plugin that generates JSON report using protected json.dump.
This prevents students from monkey-patching json.dump to fake results.

Supports two modes:
1. Pipe mode (secure): writes to a FIFO pipe that's read by checker
2. File mode (fallback): writes to a regular file for compatibility
"""

def __init__(self, report_path: str, use_pipe: bool = False):
self.report_path = report_path
self.use_pipe = use_pipe
self.start_time = time.time()
self.collected_items: list[Any] = []
self.test_results: list[dict[str, Any]] = []
self.summary = {
"passed": 0,
"failed": 0,
"skipped": 0,
"error": 0,
"xfailed": 0,
"xpassed": 0,
"total": 0,
"collected": 0,
}
self.pipe_fd = None

# If using pipe, open it immediately for writing
# Pipe must already exist and have a reader on the other end
if self.use_pipe:
try:
# Open pipe in non-blocking mode initially to avoid hanging
import os

self.pipe_fd = os.open(self.report_path, os.O_WRONLY | os.O_NONBLOCK)
# Switch to blocking mode after successful open
import fcntl

flags = fcntl.fcntl(self.pipe_fd, fcntl.F_GETFL)
fcntl.fcntl(self.pipe_fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
except (OSError, IOError):
# If pipe is not available, fall back to file mode
self.use_pipe = False
self.pipe_fd = None

def pytest_collection_finish(self, session: pytest.Session) -> None:
"""Called after test collection is complete."""
self.collected_items = session.items
self.summary["collected"] = len(session.items)

def pytest_runtest_logreport(self, report: pytest.TestReport) -> None:
"""Called for each test phase (setup, call, teardown)."""
# Only count the 'call' phase to avoid double-counting
if report.when != "call":
return

outcome = report.outcome
test_info = {
"nodeid": report.nodeid,
"outcome": outcome,
"duration": report.duration,
}

if hasattr(report, "longrepr") and report.longrepr:
test_info["longrepr"] = str(report.longrepr)

self.test_results.append(test_info)

# Update summary
if outcome == "passed":
self.summary["passed"] += 1
elif outcome == "failed":
self.summary["failed"] += 1
elif outcome == "skipped":
self.summary["skipped"] += 1
elif outcome in ("error", "xfailed", "xpassed"): # type: ignore[unreachable]
self.summary["error"] += 1

# CRITICAL: Write JSON after EACH test (incremental write)
# This protects against sys.exit() in student's code
self._write_report()

def _write_report(self) -> None:
"""
Write JSON report using PROTECTED json.dump/dumps.
Called after each test AND at session finish.

In pipe mode: writes each update as a JSON line
In file mode: overwrites the file with complete report
"""
self.summary["total"] = len(self.test_results)

report_data = {
"created": self.start_time,
"duration": time.time() - self.start_time,
"summary": self.summary,
"tests": self.test_results,
}

# Write report data as JSON
# Security: python -I flag blocks sitecustomize.py, pipe IPC protects data
try:
if self.use_pipe and self.pipe_fd is not None:
# Pipe mode: write JSON as a single line (newline-delimited JSON)
# Each write overwrites the previous state
import os

json_str = json.dumps(report_data, ensure_ascii=False)
# Write with newline separator for line-based reading
data = (json_str + "\n").encode("utf-8")
os.write(self.pipe_fd, data)
else:
# File mode: overwrite file with complete report
with open(self.report_path, "w", encoding="utf-8") as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
except (OSError, IOError) as e:
# Log I/O errors to stderr for debugging
# Don't crash pytest, but make the error visible
import sys

print(f"WARNING: Failed to write checker report: {e}", file=sys.stderr)
except Exception as e:
# Log unexpected errors
import sys

print(f"ERROR: Unexpected error in checker reporter: {type(e).__name__}: {e}", file=sys.stderr)

@pytest.hookimpl(tryfirst=True)
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int) -> None:
"""
Called after all tests are finished.
Final write and cleanup.

tryfirst=True ensures this runs BEFORE student's conftest.py hooks.
"""
# Write final report
self._write_report()

# Close pipe if we opened it
if self.pipe_fd is not None:
try:
import os

os.close(self.pipe_fd)
except (OSError, IOError):
pass
finally:
self.pipe_fd = None


def pytest_configure(config: pytest.Config) -> None:
"""
Hook called by pytest to register our plugin.
This is called EARLY, before conftest.py is loaded.
"""
report_path = config.getoption("--checker-report", default=None)
if report_path:
use_pipe = config.getoption("--checker-use-pipe", default=False)
plugin = CheckerReporterPlugin(report_path, use_pipe=use_pipe)
config.pluginmanager.register(plugin, "checker_reporter")


def pytest_addoption(parser: pytest.Parser) -> None:
"""Add custom command line options for our plugin."""
parser.addoption(
"--checker-report", action="store", default=None, help="Path to write secure JSON report (file or FIFO pipe)"
)
parser.addoption(
"--checker-use-pipe", action="store_true", default=False, help="Use pipe mode for secure IPC instead of file"
)
161 changes: 161 additions & 0 deletions checker/plugins/python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from __future__ import annotations

import json
import os
import tempfile
import threading
from pathlib import Path
from typing import Any

from pydantic import Field

from checker.exceptions import PluginExecutionFailed
from checker.plugins import PluginABC, PluginOutput
from checker.plugins.scripts import RunScriptPlugin


class RunPytestPlugin(RunScriptPlugin):
"""Plugin for running pytest."""

name = "run_pytest"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

если оставить такое имя то риск что этот плагин заоверрайдит те плагины то что в курсах и все поломается. аккуратно проверить или переназвать


class Args(PluginABC.Args):
origin: str
target: str
timeout: int | None = None
isolate: bool = False
env_whitelist: list[str] = Field(default_factory=lambda: ["PATH"])

coverage: bool | int | None = None
allow_failures: bool = False
report_percentage: bool = True

def _run(self, args: Args, *, verbose: bool = False) -> PluginOutput: # type: ignore[override]
# Use -I (isolated mode) to prevent sitecustomize.py and user site-packages
# This blocks early monkey-patching attempts
tests_cmd = ["python", "-I", "-m", "pytest"]

if not verbose:
tests_cmd += ["--no-header"]
tests_cmd += ["--tb=no"]

if args.coverage:
tests_cmd += ["--cov-report", "term-missing"]
tests_cmd += ["--cov", args.target]
if args.coverage is not True:
tests_cmd += ["--cov-fail-under", str(args.coverage)]
else:
tests_cmd += ["-p", "no:cov"]

# Use FIFO pipe for secure IPC to get test results as percentage
# Only used when report_percentage=True (weighted test scoring)
pipe_path = None
report_data_holder = {"data": None, "error": None}
reader_thread = None

try:
if args.report_percentage:
# Create a named pipe (FIFO) in temp directory
# Use random name to make it harder to find (though still not perfect)
temp_dir = Path(tempfile.gettempdir())
pipe_path = temp_dir / f"checker_pipe_{os.getpid()}_{id(self)}"

# Create FIFO pipe
# Only owner can read/write
os.mkfifo(str(pipe_path), mode=0o600)

# Start reader thread BEFORE pytest starts
reader_thread = threading.Thread(
target=self._read_pipe_data, args=(pipe_path, report_data_holder), daemon=True
)
reader_thread.start()

# Use our secure plugin with pipe mode
tests_cmd += ["-p", "checker.plugins.checker_reporter"]
tests_cmd += ["--checker-report", str(pipe_path)]
tests_cmd += ["--checker-use-pipe"]

script_cmd = " ".join(tests_cmd + [args.target])

if args.report_percentage:
script_cmd = f"{script_cmd} || true"

run_script_args = RunScriptPlugin.Args(
origin=args.origin,
script=script_cmd,
timeout=args.timeout,
env_whitelist=args.env_whitelist,
)
result = super()._run(run_script_args, verbose=verbose)

if reader_thread:
reader_thread.join(timeout=5.0)

if args.report_percentage:
report_data = report_data_holder.get("data")
error = report_data_holder.get("error")

if error:
raise PluginExecutionFailed(f"Failed to read report from pipe: {error}")
if not report_data:
raise PluginExecutionFailed("No report data received from pytest plugin")
if not isinstance(report_data, dict): # type: ignore[unreachable]
raise PluginExecutionFailed(
f"Invalid report data type: expected dict, got {type(report_data).__name__}"
)

try:
summary = report_data.get("summary", {})
if not isinstance(summary, dict):
raise PluginExecutionFailed(
f"Invalid summary type: expected dict, got {type(summary).__name__}"
)

passed = summary.get("passed", 0)
total = summary.get("total", 0)

if not isinstance(passed, (int, float)) or not isinstance(total, (int, float)):
raise PluginExecutionFailed(f"Invalid test counts: passed={passed!r}, total={total!r}")

if total > 0:
result.percentage = passed / total
else:
result.percentage = 0

except KeyError as e:
raise PluginExecutionFailed(f"Missing required field in report: {e}") from e

return result

finally:
# Clean up pipe file if it was created
if pipe_path is not None and pipe_path.exists():
try:
pipe_path.unlink()
except OSError:
pass

@staticmethod
def _read_pipe_data(pipe_path: Path, result_holder: dict[str, Any]) -> None:
"""
Read JSON data from pipe in a separate thread.
Stores the last valid JSON line in result_holder['data'].
Stores any error in result_holder['error'].
"""
try:
# Open pipe for reading (blocks until writer connects)
with open(pipe_path, "r", encoding="utf-8") as pipe:
last_valid_data = None
# Read all lines - last one wins (incremental updates)
for line in pipe:
line = line.strip()
if line:
try:
last_valid_data = json.loads(line)
except json.JSONDecodeError:
# Skip malformed lines, keep previous valid data
pass

result_holder["data"] = last_valid_data
except Exception as e:
result_holder["error"] = str(e)
Loading