diff --git a/tests/conftest.py b/tests/conftest.py index d8b8e67..8423ebf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ "test_cli_inspect_paths.py", "test_cli_path_contract.py", "test_cli_resume.py", + "test_cli_scout_url.py", "test_cli_status.py", "test_config.py", "test_metadata_hash.py", diff --git a/tests/test_cli_scout_url.py b/tests/test_cli_scout_url.py new file mode 100644 index 0000000..29a40c8 --- /dev/null +++ b/tests/test_cli_scout_url.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any + +from typer.testing import CliRunner + +from toolkit.cli.app import app +from toolkit.cli.cmd_scout_url import probe_url + + +class _ScoutHandler(BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + if self.path == "/redirect-file": + self.send_response(302) + self.send_header("Location", "/files/demo.csv") + self.end_headers() + return + + if self.path == "/files/demo.csv": + body = b"id,value\n1,10\n" + self.send_response(200) + self.send_header("Content-Type", "text/csv; charset=utf-8") + self.send_header("Content-Disposition", 'attachment; filename="demo.csv"') + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + if self.path == "/html": + body = b""" + +
+ CSV + XLSX + Parent CSV + CDN ZIP + JSON + Page + + +""" + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + if self.path == "/opaque": + body = b"opaque-bytes" + self.send_response(200) + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + self.send_response(404) + self.end_headers() + + def log_message(self, format: str, *args) -> None: # noqa: A003 + return + + +def _serve() -> tuple[ThreadingHTTPServer, str]: + server = ThreadingHTTPServer(("127.0.0.1", 0), _ScoutHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + host, port = server.server_address + return server, f"http://{host}:{port}" + + +def test_scout_url_reports_file_headers_after_redirect() -> None: + server, base_url = _serve() + runner = CliRunner() + try: + result = runner.invoke(app, ["scout-url", f"{base_url}/redirect-file"]) + finally: + server.shutdown() + server.server_close() + + assert result.exit_code == 0 + assert f"requested_url: {base_url}/redirect-file" in result.output + assert f"final_url: {base_url}/files/demo.csv" in result.output + assert "status_code: 200" in result.output + assert "content_type: text/csv; charset=utf-8" in result.output + assert 'content_disposition: attachment; filename="demo.csv"' in result.output + assert "kind: file" in result.output + assert "candidate_links: none" in result.output + + +def test_scout_url_extracts_candidate_links_from_html() -> None: + server, base_url = _serve() + runner = CliRunner() + try: + result = runner.invoke(app, ["scout-url", f"{base_url}/html"]) + finally: + server.shutdown() + server.server_close() + + assert result.exit_code == 0 + assert f"final_url: {base_url}/html" in result.output + assert "content_type: text/html; charset=utf-8" in result.output + assert "kind: html" in result.output + assert "candidate_links:" in result.output + assert f" - {base_url}/downloads/data.csv" in result.output + assert f" - {base_url}/reports/report.xlsx" in result.output + assert f" - {base_url}/exports/out.csv" in result.output + assert " - http://cdn.example.com/file.zip" in result.output + assert " - https://example.org/api/data.json" in result.output + + +def test_scout_url_marks_opaque_non_html_response() -> None: + server, base_url = _serve() + runner = CliRunner() + try: + result = runner.invoke(app, ["scout-url", f"{base_url}/opaque"]) + finally: + server.shutdown() + server.server_close() + + assert result.exit_code == 0 + assert f"final_url: {base_url}/opaque" in result.output + assert "content_type: application/octet-stream" in result.output + assert "content_disposition: None" in result.output + assert "kind: opaque" in result.output + assert "candidate_links: none" in result.output + + +def test_probe_url_uses_streaming_and_reads_body_only_for_html(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + class _FakeResponse: + def __init__(self, *, content_type: str, text: str = "") -> None: + self.headers = {"Content-Type": content_type} + self.url = "https://example.org/resource" + self.status_code = 200 + self.encoding = None + self.apparent_encoding = "utf-8" + self._text = text + self.text_reads = 0 + + @property + def text(self) -> str: + self.text_reads += 1 + return self._text + + def __enter__(self) -> "_FakeResponse": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None + + responses = [ + _FakeResponse(content_type="application/octet-stream"), + _FakeResponse(content_type="text/html; charset=utf-8", text='CSV'), + ] + + def _fake_get(*args, **kwargs): + calls.append(kwargs) + return responses[len(calls) - 1] + + monkeypatch.setattr("toolkit.cli.cmd_scout_url.requests.get", _fake_get) + + opaque = probe_url("https://example.org/opaque", timeout=7) + html = probe_url("https://example.org/html", timeout=7) + + assert opaque["kind"] == "opaque" + assert html["kind"] == "html" + assert html["candidate_links"] == ["https://example.org/data.csv"] + assert calls[0]["stream"] is True + assert calls[1]["stream"] is True + assert calls[0]["timeout"] == 7 + assert calls[1]["timeout"] == 7 + assert responses[0].text_reads == 0 + assert responses[1].text_reads == 1 diff --git a/toolkit/cli/app.py b/toolkit/cli/app.py index 4173862..c72017d 100644 --- a/toolkit/cli/app.py +++ b/toolkit/cli/app.py @@ -5,6 +5,7 @@ from toolkit.cli.cmd_run import register as register_run from toolkit.cli.cmd_profile import register as register_profile from toolkit.cli.cmd_resume import register as register_resume +from toolkit.cli.cmd_scout_url import register as register_scout_url from toolkit.cli.cmd_status import register as register_status from toolkit.cli.cmd_validate import register as register_validate from toolkit.cli.cmd_inspect import register as register_inspect @@ -15,6 +16,7 @@ register_run(app) register_profile(app) register_resume(app) +register_scout_url(app) register_status(app) register_validate(app) register_inspect(app) diff --git a/toolkit/cli/cmd_scout_url.py b/toolkit/cli/cmd_scout_url.py new file mode 100644 index 0000000..a1ff41f --- /dev/null +++ b/toolkit/cli/cmd_scout_url.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from html.parser import HTMLParser +from typing import Any +from urllib.parse import urljoin + +import requests +import typer + + +_CANDIDATE_EXTENSIONS = (".csv", ".xlsx", ".xls", ".zip", ".json") +_DEFAULT_USER_AGENT = "dataciviclab-toolkit/scout-url" +_DEFAULT_TIMEOUT = 10 +_MAX_PRINTED_LINKS = 20 + + +class _AnchorParser(HTMLParser): + def __init__(self) -> None: + super().__init__() + self.hrefs: list[str] = [] + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + if tag.lower() != "a": + return + for key, value in attrs: + if key.lower() == "href" and value: + self.hrefs.append(value) + return + + +def _is_html(content_type: str | None) -> bool: + if not content_type: + return False + return "html" in content_type.lower() + + +def _is_file_like(final_url: str, content_type: str | None, content_disposition: str | None) -> bool: + lowered_url = final_url.lower() + if any(ext in lowered_url for ext in _CANDIDATE_EXTENSIONS): + return True + if content_disposition and "attachment" in content_disposition.lower(): + return True + if content_type and not _is_html(content_type): + lowered_type = content_type.lower() + return any( + token in lowered_type + for token in ( + "csv", + "excel", + "spreadsheetml", + "zip", + "json", + ) + ) + return False + + +def _candidate_links(base_url: str, html_text: str) -> list[str]: + parser = _AnchorParser() + parser.feed(html_text) + links: list[str] = [] + seen: set[str] = set() + for href in parser.hrefs: + lowered = href.lower() + if not any(ext in lowered for ext in _CANDIDATE_EXTENSIONS): + continue + absolute = urljoin(base_url, href) + if absolute in seen: + continue + seen.add(absolute) + links.append(absolute) + return links + + +def probe_url(url: str, *, timeout: int = _DEFAULT_TIMEOUT) -> dict[str, Any]: + headers = {"User-Agent": _DEFAULT_USER_AGENT} + with requests.get(url, allow_redirects=True, timeout=timeout, headers=headers, stream=True) as response: + content_type = response.headers.get("Content-Type") + content_disposition = response.headers.get("Content-Disposition") + final_url = response.url + is_html = _is_html(content_type) + + if is_html: + response.encoding = response.encoding or response.apparent_encoding or "utf-8" + candidate_links = _candidate_links(final_url, response.text) + kind = "html" + elif _is_file_like(final_url, content_type, content_disposition): + candidate_links = [] + kind = "file" + else: + candidate_links = [] + kind = "opaque" + + return { + "requested_url": url, + "final_url": final_url, + "status_code": response.status_code, + "content_type": content_type, + "content_disposition": content_disposition, + "kind": kind, + "candidate_links": candidate_links, + } + + +def scout_url( + url: str = typer.Argument(..., help="URL da ispezionare"), + timeout: int = typer.Option(_DEFAULT_TIMEOUT, "--timeout", min=1, help="Timeout HTTP in secondi"), +) -> None: + """ + Ispeziona un URL per dataset scouting minimale. + """ + try: + result = probe_url(url, timeout=timeout) + except requests.RequestException as exc: + typer.echo(f"error: {type(exc).__name__}: {exc}") + raise typer.Exit(code=1) from exc + + typer.echo(f"requested_url: {result['requested_url']}") + typer.echo(f"final_url: {result['final_url']}") + typer.echo(f"status_code: {result['status_code']}") + typer.echo(f"content_type: {result['content_type']}") + typer.echo(f"content_disposition: {result['content_disposition']}") + typer.echo(f"kind: {result['kind']}") + + if result["candidate_links"]: + typer.echo("candidate_links:") + for link in result["candidate_links"][:_MAX_PRINTED_LINKS]: + typer.echo(f" - {link}") + remaining = len(result["candidate_links"]) - _MAX_PRINTED_LINKS + if remaining > 0: + typer.echo(f"candidate_links_more: {remaining}") + else: + typer.echo("candidate_links: none") + + +def register(app: typer.Typer) -> None: + app.command("scout-url")(scout_url)