Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test_cli_inspect_paths.py",
"test_cli_path_contract.py",
"test_cli_resume.py",
"test_cli_scout_url.py",
"test_cli_status.py",
"test_config.py",
"test_metadata_hash.py",
Expand Down
178 changes: 178 additions & 0 deletions tests/test_cli_scout_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from __future__ import annotations

import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any

from typer.testing import CliRunner

from toolkit.cli.app import app
from toolkit.cli.cmd_scout_url import probe_url


class _ScoutHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None: # noqa: N802
if self.path == "/redirect-file":
self.send_response(302)
self.send_header("Location", "/files/demo.csv")
self.end_headers()
return

if self.path == "/files/demo.csv":
body = b"id,value\n1,10\n"
self.send_response(200)
self.send_header("Content-Type", "text/csv; charset=utf-8")
self.send_header("Content-Disposition", 'attachment; filename="demo.csv"')
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return

if self.path == "/html":
body = b"""
<html>
<body>
<a href="/downloads/data.csv">CSV</a>
<a href="reports/report.xlsx">XLSX</a>
<a href="../exports/out.csv">Parent CSV</a>
<a href="//cdn.example.com/file.zip">CDN ZIP</a>
<a href="https://example.org/api/data.json">JSON</a>
<a href="/page">Page</a>
</body>
</html>
"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return

if self.path == "/opaque":
body = b"opaque-bytes"
self.send_response(200)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return

self.send_response(404)
self.end_headers()

def log_message(self, format: str, *args) -> None: # noqa: A003
return


def _serve() -> tuple[ThreadingHTTPServer, str]:
server = ThreadingHTTPServer(("127.0.0.1", 0), _ScoutHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
host, port = server.server_address
return server, f"http://{host}:{port}"


def test_scout_url_reports_file_headers_after_redirect() -> None:
server, base_url = _serve()
runner = CliRunner()
try:
result = runner.invoke(app, ["scout-url", f"{base_url}/redirect-file"])
finally:
server.shutdown()
server.server_close()

assert result.exit_code == 0
assert f"requested_url: {base_url}/redirect-file" in result.output
assert f"final_url: {base_url}/files/demo.csv" in result.output
assert "status_code: 200" in result.output
assert "content_type: text/csv; charset=utf-8" in result.output
assert 'content_disposition: attachment; filename="demo.csv"' in result.output
assert "kind: file" in result.output
assert "candidate_links: none" in result.output


def test_scout_url_extracts_candidate_links_from_html() -> None:
server, base_url = _serve()
runner = CliRunner()
try:
result = runner.invoke(app, ["scout-url", f"{base_url}/html"])
finally:
server.shutdown()
server.server_close()

assert result.exit_code == 0
assert f"final_url: {base_url}/html" in result.output
assert "content_type: text/html; charset=utf-8" in result.output
assert "kind: html" in result.output
assert "candidate_links:" in result.output
assert f" - {base_url}/downloads/data.csv" in result.output
assert f" - {base_url}/reports/report.xlsx" in result.output
assert f" - {base_url}/exports/out.csv" in result.output
assert " - http://cdn.example.com/file.zip" in result.output
assert " - https://example.org/api/data.json" in result.output


def test_scout_url_marks_opaque_non_html_response() -> None:
server, base_url = _serve()
runner = CliRunner()
try:
result = runner.invoke(app, ["scout-url", f"{base_url}/opaque"])
finally:
server.shutdown()
server.server_close()

assert result.exit_code == 0
assert f"final_url: {base_url}/opaque" in result.output
assert "content_type: application/octet-stream" in result.output
assert "content_disposition: None" in result.output
assert "kind: opaque" in result.output
assert "candidate_links: none" in result.output


def test_probe_url_uses_streaming_and_reads_body_only_for_html(monkeypatch) -> None:
calls: list[dict[str, Any]] = []

class _FakeResponse:
def __init__(self, *, content_type: str, text: str = "") -> None:
self.headers = {"Content-Type": content_type}
self.url = "https://example.org/resource"
self.status_code = 200
self.encoding = None
self.apparent_encoding = "utf-8"
self._text = text
self.text_reads = 0

@property
def text(self) -> str:
self.text_reads += 1
return self._text

def __enter__(self) -> "_FakeResponse":
return self

def __exit__(self, exc_type, exc, tb) -> None:
return None

responses = [
_FakeResponse(content_type="application/octet-stream"),
_FakeResponse(content_type="text/html; charset=utf-8", text='<a href="/data.csv">CSV</a>'),
]

def _fake_get(*args, **kwargs):
calls.append(kwargs)
return responses[len(calls) - 1]

monkeypatch.setattr("toolkit.cli.cmd_scout_url.requests.get", _fake_get)

opaque = probe_url("https://example.org/opaque", timeout=7)
html = probe_url("https://example.org/html", timeout=7)

assert opaque["kind"] == "opaque"
assert html["kind"] == "html"
assert html["candidate_links"] == ["https://example.org/data.csv"]
assert calls[0]["stream"] is True
assert calls[1]["stream"] is True
assert calls[0]["timeout"] == 7
assert calls[1]["timeout"] == 7
assert responses[0].text_reads == 0
assert responses[1].text_reads == 1
2 changes: 2 additions & 0 deletions toolkit/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from toolkit.cli.cmd_run import register as register_run
from toolkit.cli.cmd_profile import register as register_profile
from toolkit.cli.cmd_resume import register as register_resume
from toolkit.cli.cmd_scout_url import register as register_scout_url
from toolkit.cli.cmd_status import register as register_status
from toolkit.cli.cmd_validate import register as register_validate
from toolkit.cli.cmd_inspect import register as register_inspect
Expand All @@ -15,6 +16,7 @@
register_run(app)
register_profile(app)
register_resume(app)
register_scout_url(app)
register_status(app)
register_validate(app)
register_inspect(app)
Expand Down
137 changes: 137 additions & 0 deletions toolkit/cli/cmd_scout_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import annotations

from html.parser import HTMLParser
from typing import Any
from urllib.parse import urljoin

import requests
import typer


_CANDIDATE_EXTENSIONS = (".csv", ".xlsx", ".xls", ".zip", ".json")
_DEFAULT_USER_AGENT = "dataciviclab-toolkit/scout-url"
_DEFAULT_TIMEOUT = 10
_MAX_PRINTED_LINKS = 20


class _AnchorParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.hrefs: list[str] = []

def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag.lower() != "a":
return
for key, value in attrs:
if key.lower() == "href" and value:
self.hrefs.append(value)
return


def _is_html(content_type: str | None) -> bool:
if not content_type:
return False
return "html" in content_type.lower()


def _is_file_like(final_url: str, content_type: str | None, content_disposition: str | None) -> bool:
lowered_url = final_url.lower()
if any(ext in lowered_url for ext in _CANDIDATE_EXTENSIONS):
return True
if content_disposition and "attachment" in content_disposition.lower():
return True
if content_type and not _is_html(content_type):
lowered_type = content_type.lower()
return any(
token in lowered_type
for token in (
"csv",
"excel",
"spreadsheetml",
"zip",
"json",
)
)
return False


def _candidate_links(base_url: str, html_text: str) -> list[str]:
parser = _AnchorParser()
parser.feed(html_text)
links: list[str] = []
seen: set[str] = set()
for href in parser.hrefs:
lowered = href.lower()
if not any(ext in lowered for ext in _CANDIDATE_EXTENSIONS):
continue
absolute = urljoin(base_url, href)
if absolute in seen:
continue
seen.add(absolute)
links.append(absolute)
return links


def probe_url(url: str, *, timeout: int = _DEFAULT_TIMEOUT) -> dict[str, Any]:
headers = {"User-Agent": _DEFAULT_USER_AGENT}
with requests.get(url, allow_redirects=True, timeout=timeout, headers=headers, stream=True) as response:
content_type = response.headers.get("Content-Type")
content_disposition = response.headers.get("Content-Disposition")
final_url = response.url
is_html = _is_html(content_type)

if is_html:
response.encoding = response.encoding or response.apparent_encoding or "utf-8"
candidate_links = _candidate_links(final_url, response.text)
kind = "html"
elif _is_file_like(final_url, content_type, content_disposition):
candidate_links = []
kind = "file"
else:
candidate_links = []
kind = "opaque"

return {
"requested_url": url,
"final_url": final_url,
"status_code": response.status_code,
"content_type": content_type,
"content_disposition": content_disposition,
"kind": kind,
"candidate_links": candidate_links,
}


def scout_url(
url: str = typer.Argument(..., help="URL da ispezionare"),
timeout: int = typer.Option(_DEFAULT_TIMEOUT, "--timeout", min=1, help="Timeout HTTP in secondi"),
) -> None:
"""
Ispeziona un URL per dataset scouting minimale.
"""
try:
result = probe_url(url, timeout=timeout)
except requests.RequestException as exc:
typer.echo(f"error: {type(exc).__name__}: {exc}")
raise typer.Exit(code=1) from exc

typer.echo(f"requested_url: {result['requested_url']}")
typer.echo(f"final_url: {result['final_url']}")
typer.echo(f"status_code: {result['status_code']}")
typer.echo(f"content_type: {result['content_type']}")
typer.echo(f"content_disposition: {result['content_disposition']}")
typer.echo(f"kind: {result['kind']}")

if result["candidate_links"]:
typer.echo("candidate_links:")
for link in result["candidate_links"][:_MAX_PRINTED_LINKS]:
typer.echo(f" - {link}")
remaining = len(result["candidate_links"]) - _MAX_PRINTED_LINKS
if remaining > 0:
typer.echo(f"candidate_links_more: {remaining}")
else:
typer.echo("candidate_links: none")


def register(app: typer.Typer) -> None:
app.command("scout-url")(scout_url)
Loading